Automatically fusing together several for-loops

No.

2 Likes

It should be possible to get speed ups from parallelism. How did you try doing this before?

I mean technically yes, using @cfunction, though I get that it probably shouldn’t be recommended in general.

foo(a, b) = a + b
bar(a, b) = a - b

funcs = [foo, bar]
cfuncs = [
    @cfunction(foo, Cdouble, (Ref{Cdouble}, Ref{Cdouble})),
    @cfunction(bar, Cdouble, (Ref{Cdouble}, Ref{Cdouble}))
]

function sum_funcs(funcs, a, b)
    ret = 0.0
    for func in funcs
        ret += func(a, b)
    end
    ret
end

function sum_cfuncs(cfuncs, a, b)
    ret = 0.0
    for func in cfuncs
        ret += ccall(func, Cdouble, (Ref{Cdouble}, Ref{Cdouble}), a, b)
    end
    ret
end

using BenchmarkTools
@btime sum_funcs($funcs, a, b) setup = (a = rand(); b = rand()) # 88.750 ns (8 allocations: 128 bytes)
@btime sum_cfuncs($cfuncs, a, b) setup = (a = rand(); b = rand()) # 13.612 ns (0 allocations: 0 bytes)
2 Likes

I think there is. Not exactly that they have methods with the same signature, but that you know the signatures (edit: the concrete types of the functions) upfront.

I do something similar in Plugins.jl, it can merge methods defined in an array into a single method body.

It was not designed for this case, as it works on plugin types which implement methods for the same function, not separated functions. So you have to create a plugin from every function, but then it should run much faster than with cfunction.

I doubt you can automatically roll these loops together, but you could define a “foreachnode” function (or maybe there already is one?) which then takes a tuple of collector functions and iterates once over the graph, handing the nodes to your collector functions… it’d then return a tuple of results, one for each collector function.

1 Like

Besides using @cfunction as pointed out by @tkoolen, you can use FunctionWrappers.jl.

using BenchmarkTools
using StaticArrays
using FunctionWrappers: FunctionWrapper

foo(a, b, c) = a .+ b .- c
bar(a, b, c) = a .- b .+ c

function sum_funcs(funcs, a, b, c)
    r = zeros(SVector{3})
    for func in funcs
        r += func(a, b, c)
    end
    return r
end

Signature = FunctionWrapper{
    SVector{3, Float64},
    Tuple{SVector{3, Float64}, Float64, Float64}
}

fs = [foo, bar]
wfs = Signature[foo, bar]

@btime sum_funcs($fs, a, b, c) setup = (a = rand(SVector{3}); b = rand(); c = rand())
# 74.816 ns (10 allocations: 256 bytes)
@btime sum_funcs($wfs, a, b, c) setup = (a = rand(SVector{3}); b = rand(); c = rand())
# 13.896 ns (0 allocations: 0 bytes)
6 Likes

Thanks for all the helpful replies!

I’ve tried Threads.@threads, @sync and @distributed, as well as pmap; all ended up being considerably slower. It’s quite likely I’ve been using them too naively - any tips would be much appreciated! I think one of the issues here is that change_scores is relatively fast as it is - roughly 1μs, for the size of graphs I’m interested in; the problem is that I’m calling it many many times from a function which can’t be parallelised (it mutates the graph after each call to change_scores).

I’ve tried using Tullio in delta_ttriple (as well as replacing the for loop with dot products, etc) and it actually yielded slightly worse performance, not sure why. And Transducers seems very interesting, but I’m not sure I understand the benefits - should decomposing the computation in this way (assuming it’s possible, I’ll have to try) yield any performance improvements?

This gave me 20%-30% speedup with basically zero extra work - that’s incredible, thank you very much! By the way, is there any specific reason to use SVectors here? I’ve tried storing the graph’s adjacency matrix as an MMatrix (since it’s mutated by the MCMC algorithm), but these seem to be inadequate for graphs of the size I’m interested in (~200 nodes).

This is not without some work, as Plugins.jl thinks in an overly structured way, but seems like 5x improvement over FunctionWrappers in the summer sample.

  | | |_| | | | (_| |  |  Version 1.5.1 (2020-08-25)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using BenchmarkTools

julia> using StaticArrays

julia> using Plugins

julia> foo(a, b, c) = a .+ b .- c
foo (generic function with 1 method)

julia> bar(a, b, c) = a .- b .+ c
bar (generic function with 1 method)

julia> function calc end
calc (generic function with 0 methods)

julia> mutable struct Foo <: Plugin
           result::SVector{3, Float64}
           Foo() = new(zeros(SVector{3}))
       end

julia> mutable struct Bar <: Plugin
           result::SVector{3, Float64}
           Bar() = new(zeros(SVector{3}))
       end

julia> struct Results end # Shared state, not used

julia> calc(pfoo::Foo, result, a, b, c) = pfoo.result = foo(a, b, c)
calc (generic function with 1 method)

julia> calc(pbar::Bar, result, a, b, c) = pbar.result = bar(a, b, c)
calc (generic function with 2 methods)

julia> const pfoo = Foo()
Foo([0.0, 0.0, 0.0])

julia> const pbar = Bar()
Bar([0.0, 0.0, 0.0])

julia> calculators = PluginStack([pfoo, pbar])
PluginStack(Plugin[Foo([0.0, 0.0, 0.0]), Bar([0.0, 0.0, 0.0])], Any[], Dict{Symbol,Plugin}(:nothing => Bar([0.0, 0.0, 0.0])), nothing)

julia> const results = Results()
Results()

julia> const mergedcalc = hooklist(calculators, calc, results)
Plugins.HookList{Plugins.HookList{Plugins.HookList{Nothing,Nothing,Nothing,Nothing},typeof(calc),Bar,Results},typeof(calc),Foo,Results}(Plugins.HookList{Plugins.HookList{Nothing,Nothing,Nothing,Nothing},typeof(calc),Bar,Results}(Plugins.HookList{Nothing,Nothing,Nothing,Nothing}(nothing, nothing, nothing, nothing), calc, Bar([0.0, 0.0, 0.0]), Results()), calc, Foo([0.0, 0.0, 0.0]), Results())

julia> @btime begin mergedcalc(a, b, c); zeros(SVector{3}) + pfoo.result + pbar.result; end setup = (a = rand(SVector{3}); b = rand(); c = rand())
  2.157 ns (0 allocations: 0 bytes)
3-element SArray{Tuple{3},Float64,1,3} with indices SOneTo(3):
 0.12151616008457733
 1.8758013798301496
 0.21778100588128257

edit1: I remember seeing a package that was exactly for merging functions, but cannot find it now.
edit2: Edited the code to collect the results in plugin states, not in shared state

1 Like

Yes, if calling change_scores is 1μs I imagine the overhead of threading is going to get in the way here.

It’s still worth a try though. I think you could try out ThreadPools: https://tro3.github.io/ThreadPools.jl/build/index.html with the @qthreads macro.

function change_scores(g::E, i, j) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
    func_list = g.fs
    @qthreads for k = 1:g.n_funcs
        x[k] = func_list[k](g, i, j)
    end
    return x
end

Since there is a lot of variation in the speed of these functions, this will work generally better than @threads which is designed mainly for the case where each iteration takes a similar amount of time.

Also, not sure what the “outer” algorithm is. But if it’s an MCMC type procedure as you mention, you could of course run multiple chains on separate threads.

Also, something like this might make sense.

x = ThreadPools.qmap(f -> f(g,i,j),func_list)

What’s the typical length of g.fs? Why not use a tuple like

struct erg{G <: AbstractArray{Bool}, Functions <: Tuple} <: AbstractGraph{Int}
    m::G
    fs::Functions
    ...

as mentioned by other comments? Are they completely different functions or some parametrized callable of the same type (e.g., closure)?

Is it possible that this is essentially the cost of dynamic dispatch? I think profiling change_scores could help.

Also, 1μs is rather too short for using threads. It takes a few μs to start a task in current julia. So, you’d need to chunk more computations to get a good speedup.

The length of fs can vary between a couple of functions, to 100-200 at most. All functions have the same signature, and the same type of their return value (see below). What would be the pros/cons of using a tuple instead of an array of functions?

I’ve tried to measure this using Profile/ProfileView but couldn’t generate the kind of summary I was looking for (basically % of time spent in each of g.fs, as well as a break-down of different overhead times). Is there a way to generate such a summary? I’d be happy to share the jlprof file if it helps.

Chunking seemed like a good option to me, as well; but I wasn’t able to get it to work:

  1. For a 100-nodes graph and ~10 functions in g.fs, change_scores currently takes roughly 0.5-1μs, using @pabloferz excellent tip. Still haven’t implemented @tisztamo Plugins approach, but it seems super relevant, as well.
  2. subgraphcounts iterates over the edges of the graph; for each existing edge, it calls change_scores, accumulates the result, and then removes the edge. This can theoretically be fully parallelised (by calling change_scores separately for each edge, and within each call, delete all the edges before it). However, since this function acts on a copy of the original graph, it has some copying overhead, so indeed chunking seems reasonable. For a 100 nodes graph, the sequential version takes 350-500μs, while a chunked version that operates on 1/10th of the graph takes ~70μs. However, accumulating the results over different chunks using qthreads (with Threads.nthreads()=8) takes ~3ms, and returns non-deterministic results, so I’m obviously doing something wrong. Using ThreadPools.qmap instead of qthreads solves the non-determinism, but is also considerably slower than doing everything sequentially.

Here are the original functions:

function change_scores(g::E, i, j) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
#     x = ThreadPools.qmap(f -> f(g,i,j),g.fs)
#     @qthreads for k in 1:g.n_funcs
#         x[k] = g.fs[k](g,i,j)
#     end
    for k in 1:g.n_funcs
        x[k] = g.fs[k](g,i,j)
    end
    return x
end

function subgraphcount(g::E) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
    g2 = deepcopy(g)

    for j in vertices(g2)
        for i in vertices(g2)
            if i == j
                continue
            elseif !has_edge(g2, i, j)
                continue
            else
                x -= change_scores(g2, i, j)
                edge_toggle!(g2, i, j)
            end
        end
    end
    return x
end

Here’s the chunked version of subgraphcounts and the qthreads function that calls it:

function ch_subgraphcount(g::E, start_col, end_col) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
    g2 = deepcopy(g)
    if start_col > 1
       g2.m[:,1:(start_col-1)].=0
       g2.trans[1:(start_col-1),:].=0
       g2.indegree[:] = vec(sum(g2.m, dims=1)) 
       g2.outdegree[:] = vec(sum(g2.m, dims=2)) 
    end
    for j in start_col:(end_col-1)
        for i in vertices(g2)
            if i == j
                continue
            elseif !has_edge(g2, i, j)
                continue
            else
                x -= change_scores(g2, i, j)
                edge_toggle!(g2, i, j)
            end
        end
    end
    return x
end

function p_subgraphcount(g::E, chunksize) where {E <: AbstractGraph}
    n = nv(g)
    if n % chunksize != 0
        throw("chunksize in not a whole multiple of graph size")
    end
    x = zeros(g.n_funcs)
    @qthreads for i=1:(n÷chunksize)
        x += ch_subgraphcount(g, 1+(i-1)*chunksize, 1+i*chunksize)
    end
    return x
    #return sum(ThreadPools.qmap(i -> ch_subgraphcount(g, 1+(i-1)*chunksize, 1+i*chunksize), 1:(n÷chunksize)))
end

This is the erg data structure:

struct erg{G <: AbstractArray{Bool}} <: AbstractGraph{Int}
    m::G
    fs::Array{FunctionWrapper{Float64,Tuple{erg,Int64,Int64}},1}
    trans::G
    realnodecov::Union{Dict{String,Array{Float64,1}},Nothing}
    catnodecov::Union{Dict{String,Array{Any,1}},Nothing}
    edgecov::Union{Array{Array{Float64,2}},Nothing}
    indegree::Vector{Int64}
    outdegree::Vector{Int64}
    n_funcs::Int
    kstars_precomputed::Array{Float64,2}
end

Signature = FunctionWrapper{
    Float64,
    Tuple{erg, Int, Int}
}

I’ve put all the delta_foo functions in this gist to avoid unnecessary clutter, in case anyone’s interested.

Yes, subgraphcounts is used to compute the MH transition probability; I’m currently trying to optimise it (and the lower-level change_scores), but parallelising the higher-level MCMC function using mutlithreading is definitely the next step!

I got excited from this topic and created a micropackage while my son is sleeping :slight_smile:

  | | |_| | | | (_| |  |  Version 1.5.1 (2020-08-25)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using FunctionWranglers

julia> create_adder(value) = (x) -> x + value
create_adder (generic function with 1 method)

julia> adders = [create_adder(i) for i = 1:5]
5-element Array{var"#1#2"{Int64},1}:
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)

julia> w = FunctionWrangler(adders)
FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}}}(var"#1#2"{Int64}(1), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}}(var"#1#2"{Int64}(2), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}(var"#1#2"{Int64}(3), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}(var"#1#2"{Int64}(4), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}(var"#1#2"{Int64}(5), FunctionWrangler{Nothing,Nothing}(nothing, nothing))))))

julia> result = zeros(Float64, length(adders))
5-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0

julia> smap!(result, w, 10.0)

julia> result
5-element Array{Float64,1}:
 11.0
 12.0
 13.0
 14.0
 15.0

julia> @btime smap!($result, $w, d) setup = (d = rand())
  2.892 ns (0 allocations: 0 bytes)

It works well for 200 functions (<1s compile time at the first call), and seems to be usable up to at least 1000 functions.

5 Likes

Your approach got me excited as well! :smile:
I think you could get the same functionality by wrapping Tuples and using tail recursion. Using the tuples directly, you can do

apply_f_tuple(ft::Tuple{}, args...) = ()
apply_f_tuple(ft::Tuple, args...) = (first(ft)(args...), apply_f_tuple(Base.tail(ft), args...)...)
julia> adders = ntuple(i->create_adder(i), 5);

julia> apply_f_tuple(adders, 10.0)
(11.0, 12.0, 13.0, 14.0, 15.0)

The advantage here is that you get a tuple back, which is some cases can be desirable (also matches up with how map works on tuples, which is nice)

For larger tuples though, your smap! api makes more sense (returning long tuples isn’t generally a good idea…). You can do this with tuples also

_smap!(ft::Tuple{}, dest, i, args...) = dest

function _smap!(ft::Tuple, dest, i, args...)
    dest[i] = first(ft)(args...)
    _smap!(Base.tail(ft), dest, i+1, args...)
end

smap!(ft::Tuple, dest, args...) = _smap!(ft, dest, firstindex(dest), args...)

And my favorite part is you can infer the narrowest return type and allocate the container based on that.

_container_type(ft::Tuple, args...) = mapreduce(f -> Base._return_type(f, typeof.(args)), typejoin, ft)

function smap(ft::Tuple, args...)
    CT = _container_type(ft, args...)
    dest = Vector{CT}(undef, length(ft))
    smap!(ft, dest, args...)
end

However, taking either of our approaches I see a stack overflow being printed from the compiler for very large tuples/FunctionWranglers (~1000) (the output is actually still correct despite the error). I think that might just be par for the course when dealing with huge tuple/nested types though.


Edit: Turns out apply_f_tuple is literally just map (for tuples) under a different name…

1 Like

If it is 100-200 functions, I don’t think using a tuple makes sense. I’d use a tuple if there are only “handful” of elements (let’s say < 16, as that’s the heuristics Base uses; but the compiler can handle more).

Note that there is no way to express “signature” (input types and output type) in Julia’s type system. That’s why you need the ccall hack to get some decent performance.

This is why I asked if you have closures or some callable objects. That is to say, do you have 100 functions with completely different implementations? Or, are they actually some parameterized functions? If they are closures generated by the same function, their type is identical:

julia> create_adder(value) = (x) -> x + value;

julia> typeof(create_adder(1)) === typeof(create_adder(2))
true

So, you can put them in a vector without invoking run-time dispatch:

julia> callfirst(fs, x) = first(fs)(x);

julia> @code_warntype callfirst([create_adder(1), create_adder(2)], 0)
Variables
  #self#::Core.Const(callfirst, false)
  fs::Vector{var"#1#2"{Int64}}
  x::Int64

Body::Int64
1 ─ %1 = Base.getindex(fs, 1)::var"#1#2"{Int64}
│   %2 = (%1)(x)::Int64
└──      return %2

Even if not all function types are identical, I’d imagine there are only handful of function types. If that’s the case, you can use Iterators.flatten to group closures/functions by their type, e.g., Iterators.flatten(([create_adder(1), create_adder(2)], [create_adder(1im)])).

Unfortunately, Julia’s native for loop is not powerful enough to completely optimize a complex iterator like Iterators.flatten. You’d need to use Base.foldl or some external packages like FLoops.jl (ref [RFC/ANN] FLoops.jl: fast generic for loops (foldl for humans™)) to eliminate dynamic dispatches.

Yes, I think it’d be a better way to parallelize the computation. FWIW, if your graph object already defines Base.iterate (and, preferably, Base.length), you can simply add SplittablesBase.halve to support parallel computations via Transducers.jl, aforementioned FLoops.jl, ThreadsX.jl, etc.


Regarding ThreadPools.jl… I hope I don’t sound like trivializing @tro3’s hard work but I think it’s important to understand that the primary motivation for ThreadPools.jl is to “undo” the design of composable multi-threading in Julia (see Announcing composable multi-threaded parallelism in Julia). IIUC, ThreadPools.jl exists for separating out latency-critical code (executed in the primary thread) from throughput-oriented code (executed in non-primary threads). This is very useful if you are writing, e.g., GUI application but it is not desirable to use it in a library or throughput-oriented user code. ThreadPools.jl is a very clever and useful workaround for the current state of multi-threading in Julia. However, I think it’s a good idea to avoid using it if you mainly care about the “overall speed” (i.e., throughput) of your computation and composability with the rest of the ecosystem.

3 Likes

@tkf is correct (wiping tears away… j/k :slight_smile: ) as to the latency-vs-throughput use case. The other niche case is when your jobs vary widely in size - the queued pool will chew through the fast ones while the slow ones run. For any other usage besides those two niches, the existing Julia infrastructure is superior. I’ll plow through the rest of this thread in a bit. (For some reason, I am no longer getting any emails from Discourse, so I am missing things until days later.)

1 Like

This is actually a common enough case. Like for example you’re running some kind of simulation, and the duration of the simulation is dependent on random number generation, or the stiffness of an ODE is dependent on the initial conditions you give, or something like that, so that some of the instances take much longer than others. I really like ThreadPools because I think this use case is going to be common enough for me.

1 Like

I don’t think you need a “user-land” task pool mechanism if it is just for load balancing. julia runtime already has a “thread pool” and we can just use it. For example, Transducers.foldxt provides a simple parameter basesize to control the load balancing while directly using julia's task scheduler (I think basesize=1 corresponds to ThreadPools.tmap and the default basesize = length(data) ÷ nthreads() corresponds to Threads.@threads).

A potential benefit of ThreadPools.jl-like approach is that, when you have compute-intensive tasks mixed with I/O, it can be used to limit the number of concurrently executed tasks (I’m not sure if it’s already implemented in ThreadPools.jl but I’ve been wanting to add this in Transducers.jl). This is useful sometimes because you can limit the resource (e.g., memory) required for the entire process even when there are a lot of I/O. However, it comes with a cost because communication with Channel (used in ThreadPools.jl and handy for implementing this kind of stuff) has higher cost than simply spawning a task.

1 Like

So, imagine you want to run 1000 simulations, and they take anywhere from 1ms to 40 minutes with an average say of 1 minute. Suppose you have say 6 cores. If you use ThreadPools you can queue up 6 simulations at a time, as soon as one of them finishes, a new one will spawn. You’ll always have 6 of them running at any one time, and you’ll never have a situation like you would with @threads for i = 1:1000 ... where the first 100 of them all take 40 minutes because the parameter you’re sweeping through is similar for all of them… and then this one thread takes 4000 minutes while the entire rest of the calculation across all the other threads takes 2000/5 = 400 minutes.

With thread pools, instead of waiting 4000 minutes for the first thread to finish, you’d wait 4000/6 + 2000/6 = 1000 minutes

What you are describing is just a limitation of @threads and not julia's task system. It’s also not the property of the task pool provided by ThreadPools.jl that helps you getting the load balancing behavior. That’s the implementation of the higher-level API like ThreadPools.tmap that happens to use one task per element. This can be done without implementing the task pool. I’m pretty sure there are a few other packages (besides Transducers.jl and ThreadsX.jl) with a similar implementation that directly uses julia's thread pool (e.g., Parallelism.tmap uses basesize=1 by default).

1 Like

Just be clear, I’m not discouraging anyone to use ThreadPools.jl in application (= non-library) code especially if you don’t care about the throughput of task spawns (some tasks taking 40 min while others can end in 1 ms is a good example). It is tested in the wild and comes with an excellent profiling facility. I just wanted to discuss the properties of it so that we can understand the pros and cons.