Automatically fusing together several for-loops

I’m trying to speed up an algorithm that computes various graph statistics. The relevant part of the code is:

struct erg{G <: AbstractArray{Bool},F <: Function} <: AbstractGraph{Int}
    m::G
    fs::Vector{F}
    trans::G
    realnodecov::Union{Dict{String,Array{Float64,1}},Nothing}
    catnodecov::Union{Dict{String,Array{Any,1}},Nothing}
    edgecov::Union{Array{Array{Float64,2}},Nothing}
    indegree::Vector{Int64}
    outdegree::Vector{Int64}
    n_funcs::Int
    kstars_precomputed::Array{Float64,2}
end

function change_scores(g::E, i, j) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
    func_list = g.fs
    for k = 1:g.n_funcs
        x[k] = func_list[k](g, i, j)
    end
    return x
end

In general, the different graph statistics (represented by a vector of functions g.fs) can be anything; in practice, many of them are either O(1) functions such as:


# Toggle and edge and get the changes in total number of edges; really basic, but useful.
function delta_edge(g::E, s, r) where {E <: AbstractGraph}
    if !has_edge(g, s, r)
        return 1.0
    else
        return -1.0
    end
end

# Toggle edge from s -> r, and get changes in count of reciprocal dyads
function delta_mutual(g::E, s, r) where {E <: AbstractGraph}
    if !g.trans[s, r]
        return 0.0
    elseif !g.m[s, r]
        return 1.0
    else
        return -1.0
    end
end

or contain some sort of loop over the vertices of the graph, such as

function delta_ttriple(g::E, s, r) where {E <: AbstractGraph}
    x = 0
    @inbounds for i in vertices(g)
        x +=
            (g.trans[i, r] * g.trans[i, s]) +
            (g.m[i, r] * g.trans[i, s]) +
            (g.m[i, r] * g.m[i, s])
    end
    if !has_edge(g, s, r)
        return convert(Float64, x)
    else
        return -convert(Float64, x)
    end
end

The functions don’t mutate the graph, and can be computed in parallel (though I was not able to get any speedups from doing so). Is there a way to “annotate” functions of the second kind (the ones that loop over the vertices of the graph) in a way that would cause their for-loops to be “fused”? I’m not sure if this is the exact terminology - my goal is improve performance by reducing the number of passes over the vertices of the graph (ideally to a single pass, even if there are several functions of the second kind in g.fs).

Any other performance tips would be appreciated, as well - the change_scores function is the computational bottleneck of the algorithm, so even a small speedup is important.

What have you tried in terms of parallelization?

For the loops maybe you could try

It sounds like your usecase could work with that (see warnings in the repo)

for the delta_ttriple function you could also check out

which I think is partly based on the package above.

Another great package in thta direction is

Hope that helps! :slight_smile:

This isn’t going to work: unless the elements of fs are literally all the same function, F will have to be an abstract type (Function), and hence calling the functions will be relatively slow. To get a concretely typed list of functions, you would need something like a tuple of functions here.

2 Likes

The signature of the functions in fs is always (g::E, s::Int, r::Int), and the return type is always Float. Is there a way to use this information to speed up change_scores?

No.

2 Likes

It should be possible to get speed ups from parallelism. How did you try doing this before?

I mean technically yes, using @cfunction, though I get that it probably shouldn’t be recommended in general.

foo(a, b) = a + b
bar(a, b) = a - b

funcs = [foo, bar]
cfuncs = [
    @cfunction(foo, Cdouble, (Ref{Cdouble}, Ref{Cdouble})),
    @cfunction(bar, Cdouble, (Ref{Cdouble}, Ref{Cdouble}))
]

function sum_funcs(funcs, a, b)
    ret = 0.0
    for func in funcs
        ret += func(a, b)
    end
    ret
end

function sum_cfuncs(cfuncs, a, b)
    ret = 0.0
    for func in cfuncs
        ret += ccall(func, Cdouble, (Ref{Cdouble}, Ref{Cdouble}), a, b)
    end
    ret
end

using BenchmarkTools
@btime sum_funcs($funcs, a, b) setup = (a = rand(); b = rand()) # 88.750 ns (8 allocations: 128 bytes)
@btime sum_cfuncs($cfuncs, a, b) setup = (a = rand(); b = rand()) # 13.612 ns (0 allocations: 0 bytes)
2 Likes

I think there is. Not exactly that they have methods with the same signature, but that you know the signatures (edit: the concrete types of the functions) upfront.

I do something similar in Plugins.jl, it can merge methods defined in an array into a single method body.

It was not designed for this case, as it works on plugin types which implement methods for the same function, not separated functions. So you have to create a plugin from every function, but then it should run much faster than with cfunction.

I doubt you can automatically roll these loops together, but you could define a “foreachnode” function (or maybe there already is one?) which then takes a tuple of collector functions and iterates once over the graph, handing the nodes to your collector functions… it’d then return a tuple of results, one for each collector function.

1 Like

Besides using @cfunction as pointed out by @tkoolen, you can use FunctionWrappers.jl.

using BenchmarkTools
using StaticArrays
using FunctionWrappers: FunctionWrapper

foo(a, b, c) = a .+ b .- c
bar(a, b, c) = a .- b .+ c

function sum_funcs(funcs, a, b, c)
    r = zeros(SVector{3})
    for func in funcs
        r += func(a, b, c)
    end
    return r
end

Signature = FunctionWrapper{
    SVector{3, Float64},
    Tuple{SVector{3, Float64}, Float64, Float64}
}

fs = [foo, bar]
wfs = Signature[foo, bar]

@btime sum_funcs($fs, a, b, c) setup = (a = rand(SVector{3}); b = rand(); c = rand())
# 74.816 ns (10 allocations: 256 bytes)
@btime sum_funcs($wfs, a, b, c) setup = (a = rand(SVector{3}); b = rand(); c = rand())
# 13.896 ns (0 allocations: 0 bytes)
6 Likes

Thanks for all the helpful replies!

I’ve tried Threads.@threads, @sync and @distributed, as well as pmap; all ended up being considerably slower. It’s quite likely I’ve been using them too naively - any tips would be much appreciated! I think one of the issues here is that change_scores is relatively fast as it is - roughly 1μs, for the size of graphs I’m interested in; the problem is that I’m calling it many many times from a function which can’t be parallelised (it mutates the graph after each call to change_scores).

I’ve tried using Tullio in delta_ttriple (as well as replacing the for loop with dot products, etc) and it actually yielded slightly worse performance, not sure why. And Transducers seems very interesting, but I’m not sure I understand the benefits - should decomposing the computation in this way (assuming it’s possible, I’ll have to try) yield any performance improvements?

This gave me 20%-30% speedup with basically zero extra work - that’s incredible, thank you very much! By the way, is there any specific reason to use SVectors here? I’ve tried storing the graph’s adjacency matrix as an MMatrix (since it’s mutated by the MCMC algorithm), but these seem to be inadequate for graphs of the size I’m interested in (~200 nodes).

This is not without some work, as Plugins.jl thinks in an overly structured way, but seems like 5x improvement over FunctionWrappers in the summer sample.

  | | |_| | | | (_| |  |  Version 1.5.1 (2020-08-25)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using BenchmarkTools

julia> using StaticArrays

julia> using Plugins

julia> foo(a, b, c) = a .+ b .- c
foo (generic function with 1 method)

julia> bar(a, b, c) = a .- b .+ c
bar (generic function with 1 method)

julia> function calc end
calc (generic function with 0 methods)

julia> mutable struct Foo <: Plugin
           result::SVector{3, Float64}
           Foo() = new(zeros(SVector{3}))
       end

julia> mutable struct Bar <: Plugin
           result::SVector{3, Float64}
           Bar() = new(zeros(SVector{3}))
       end

julia> struct Results end # Shared state, not used

julia> calc(pfoo::Foo, result, a, b, c) = pfoo.result = foo(a, b, c)
calc (generic function with 1 method)

julia> calc(pbar::Bar, result, a, b, c) = pbar.result = bar(a, b, c)
calc (generic function with 2 methods)

julia> const pfoo = Foo()
Foo([0.0, 0.0, 0.0])

julia> const pbar = Bar()
Bar([0.0, 0.0, 0.0])

julia> calculators = PluginStack([pfoo, pbar])
PluginStack(Plugin[Foo([0.0, 0.0, 0.0]), Bar([0.0, 0.0, 0.0])], Any[], Dict{Symbol,Plugin}(:nothing => Bar([0.0, 0.0, 0.0])), nothing)

julia> const results = Results()
Results()

julia> const mergedcalc = hooklist(calculators, calc, results)
Plugins.HookList{Plugins.HookList{Plugins.HookList{Nothing,Nothing,Nothing,Nothing},typeof(calc),Bar,Results},typeof(calc),Foo,Results}(Plugins.HookList{Plugins.HookList{Nothing,Nothing,Nothing,Nothing},typeof(calc),Bar,Results}(Plugins.HookList{Nothing,Nothing,Nothing,Nothing}(nothing, nothing, nothing, nothing), calc, Bar([0.0, 0.0, 0.0]), Results()), calc, Foo([0.0, 0.0, 0.0]), Results())

julia> @btime begin mergedcalc(a, b, c); zeros(SVector{3}) + pfoo.result + pbar.result; end setup = (a = rand(SVector{3}); b = rand(); c = rand())
  2.157 ns (0 allocations: 0 bytes)
3-element SArray{Tuple{3},Float64,1,3} with indices SOneTo(3):
 0.12151616008457733
 1.8758013798301496
 0.21778100588128257

edit1: I remember seeing a package that was exactly for merging functions, but cannot find it now.
edit2: Edited the code to collect the results in plugin states, not in shared state

1 Like

Yes, if calling change_scores is 1μs I imagine the overhead of threading is going to get in the way here.

It’s still worth a try though. I think you could try out ThreadPools: ThreadPools.jl · ThreadPool Documentation with the @qthreads macro.

function change_scores(g::E, i, j) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
    func_list = g.fs
    @qthreads for k = 1:g.n_funcs
        x[k] = func_list[k](g, i, j)
    end
    return x
end

Since there is a lot of variation in the speed of these functions, this will work generally better than @threads which is designed mainly for the case where each iteration takes a similar amount of time.

Also, not sure what the “outer” algorithm is. But if it’s an MCMC type procedure as you mention, you could of course run multiple chains on separate threads.

Also, something like this might make sense.

x = ThreadPools.qmap(f -> f(g,i,j),func_list)

What’s the typical length of g.fs? Why not use a tuple like

struct erg{G <: AbstractArray{Bool}, Functions <: Tuple} <: AbstractGraph{Int}
    m::G
    fs::Functions
    ...

as mentioned by other comments? Are they completely different functions or some parametrized callable of the same type (e.g., closure)?

Is it possible that this is essentially the cost of dynamic dispatch? I think profiling change_scores could help.

Also, 1μs is rather too short for using threads. It takes a few μs to start a task in current julia. So, you’d need to chunk more computations to get a good speedup.

The length of fs can vary between a couple of functions, to 100-200 at most. All functions have the same signature, and the same type of their return value (see below). What would be the pros/cons of using a tuple instead of an array of functions?

I’ve tried to measure this using Profile/ProfileView but couldn’t generate the kind of summary I was looking for (basically % of time spent in each of g.fs, as well as a break-down of different overhead times). Is there a way to generate such a summary? I’d be happy to share the jlprof file if it helps.

Chunking seemed like a good option to me, as well; but I wasn’t able to get it to work:

  1. For a 100-nodes graph and ~10 functions in g.fs, change_scores currently takes roughly 0.5-1μs, using @pabloferz excellent tip. Still haven’t implemented @tisztamo Plugins approach, but it seems super relevant, as well.
  2. subgraphcounts iterates over the edges of the graph; for each existing edge, it calls change_scores, accumulates the result, and then removes the edge. This can theoretically be fully parallelised (by calling change_scores separately for each edge, and within each call, delete all the edges before it). However, since this function acts on a copy of the original graph, it has some copying overhead, so indeed chunking seems reasonable. For a 100 nodes graph, the sequential version takes 350-500μs, while a chunked version that operates on 1/10th of the graph takes ~70μs. However, accumulating the results over different chunks using qthreads (with Threads.nthreads()=8) takes ~3ms, and returns non-deterministic results, so I’m obviously doing something wrong. Using ThreadPools.qmap instead of qthreads solves the non-determinism, but is also considerably slower than doing everything sequentially.

Here are the original functions:

function change_scores(g::E, i, j) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
#     x = ThreadPools.qmap(f -> f(g,i,j),g.fs)
#     @qthreads for k in 1:g.n_funcs
#         x[k] = g.fs[k](g,i,j)
#     end
    for k in 1:g.n_funcs
        x[k] = g.fs[k](g,i,j)
    end
    return x
end

function subgraphcount(g::E) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
    g2 = deepcopy(g)

    for j in vertices(g2)
        for i in vertices(g2)
            if i == j
                continue
            elseif !has_edge(g2, i, j)
                continue
            else
                x -= change_scores(g2, i, j)
                edge_toggle!(g2, i, j)
            end
        end
    end
    return x
end

Here’s the chunked version of subgraphcounts and the qthreads function that calls it:

function ch_subgraphcount(g::E, start_col, end_col) where {E <: AbstractGraph}
    x = zeros(g.n_funcs)
    g2 = deepcopy(g)
    if start_col > 1
       g2.m[:,1:(start_col-1)].=0
       g2.trans[1:(start_col-1),:].=0
       g2.indegree[:] = vec(sum(g2.m, dims=1)) 
       g2.outdegree[:] = vec(sum(g2.m, dims=2)) 
    end
    for j in start_col:(end_col-1)
        for i in vertices(g2)
            if i == j
                continue
            elseif !has_edge(g2, i, j)
                continue
            else
                x -= change_scores(g2, i, j)
                edge_toggle!(g2, i, j)
            end
        end
    end
    return x
end

function p_subgraphcount(g::E, chunksize) where {E <: AbstractGraph}
    n = nv(g)
    if n % chunksize != 0
        throw("chunksize in not a whole multiple of graph size")
    end
    x = zeros(g.n_funcs)
    @qthreads for i=1:(n÷chunksize)
        x += ch_subgraphcount(g, 1+(i-1)*chunksize, 1+i*chunksize)
    end
    return x
    #return sum(ThreadPools.qmap(i -> ch_subgraphcount(g, 1+(i-1)*chunksize, 1+i*chunksize), 1:(n÷chunksize)))
end

This is the erg data structure:

struct erg{G <: AbstractArray{Bool}} <: AbstractGraph{Int}
    m::G
    fs::Array{FunctionWrapper{Float64,Tuple{erg,Int64,Int64}},1}
    trans::G
    realnodecov::Union{Dict{String,Array{Float64,1}},Nothing}
    catnodecov::Union{Dict{String,Array{Any,1}},Nothing}
    edgecov::Union{Array{Array{Float64,2}},Nothing}
    indegree::Vector{Int64}
    outdegree::Vector{Int64}
    n_funcs::Int
    kstars_precomputed::Array{Float64,2}
end

Signature = FunctionWrapper{
    Float64,
    Tuple{erg, Int, Int}
}

I’ve put all the delta_foo functions in this gist to avoid unnecessary clutter, in case anyone’s interested.

Yes, subgraphcounts is used to compute the MH transition probability; I’m currently trying to optimise it (and the lower-level change_scores), but parallelising the higher-level MCMC function using mutlithreading is definitely the next step!

I got excited from this topic and created a micropackage while my son is sleeping :slight_smile:

https://github.com/tisztamo/FunctionWranglers.jl

  | | |_| | | | (_| |  |  Version 1.5.1 (2020-08-25)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using FunctionWranglers

julia> create_adder(value) = (x) -> x + value
create_adder (generic function with 1 method)

julia> adders = [create_adder(i) for i = 1:5]
5-element Array{var"#1#2"{Int64},1}:
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)
 #1 (generic function with 1 method)

julia> w = FunctionWrangler(adders)
FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}}}(var"#1#2"{Int64}(1), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}}(var"#1#2"{Int64}(2), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}(var"#1#2"{Int64}(3), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}(var"#1#2"{Int64}(4), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}(var"#1#2"{Int64}(5), FunctionWrangler{Nothing,Nothing}(nothing, nothing))))))

julia> result = zeros(Float64, length(adders))
5-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0

julia> smap!(result, w, 10.0)

julia> result
5-element Array{Float64,1}:
 11.0
 12.0
 13.0
 14.0
 15.0

julia> @btime smap!($result, $w, d) setup = (d = rand())
  2.892 ns (0 allocations: 0 bytes)

It works well for 200 functions (<1s compile time at the first call), and seems to be usable up to at least 1000 functions.

5 Likes

Your approach got me excited as well! :smile:
I think you could get the same functionality by wrapping Tuples and using tail recursion. Using the tuples directly, you can do

apply_f_tuple(ft::Tuple{}, args...) = ()
apply_f_tuple(ft::Tuple, args...) = (first(ft)(args...), apply_f_tuple(Base.tail(ft), args...)...)
julia> adders = ntuple(i->create_adder(i), 5);

julia> apply_f_tuple(adders, 10.0)
(11.0, 12.0, 13.0, 14.0, 15.0)

The advantage here is that you get a tuple back, which is some cases can be desirable (also matches up with how map works on tuples, which is nice)

For larger tuples though, your smap! api makes more sense (returning long tuples isn’t generally a good idea…). You can do this with tuples also

_smap!(ft::Tuple{}, dest, i, args...) = dest

function _smap!(ft::Tuple, dest, i, args...)
    dest[i] = first(ft)(args...)
    _smap!(Base.tail(ft), dest, i+1, args...)
end

smap!(ft::Tuple, dest, args...) = _smap!(ft, dest, firstindex(dest), args...)

And my favorite part is you can infer the narrowest return type and allocate the container based on that.

_container_type(ft::Tuple, args...) = mapreduce(f -> Base._return_type(f, typeof.(args)), typejoin, ft)

function smap(ft::Tuple, args...)
    CT = _container_type(ft, args...)
    dest = Vector{CT}(undef, length(ft))
    smap!(ft, dest, args...)
end

However, taking either of our approaches I see a stack overflow being printed from the compiler for very large tuples/FunctionWranglers (~1000) (the output is actually still correct despite the error). I think that might just be par for the course when dealing with huge tuple/nested types though.


Edit: Turns out apply_f_tuple is literally just map (for tuples) under a different name…

1 Like

If it is 100-200 functions, I don’t think using a tuple makes sense. I’d use a tuple if there are only “handful” of elements (let’s say < 16, as that’s the heuristics Base uses; but the compiler can handle more).

Note that there is no way to express “signature” (input types and output type) in Julia’s type system. That’s why you need the ccall hack to get some decent performance.

This is why I asked if you have closures or some callable objects. That is to say, do you have 100 functions with completely different implementations? Or, are they actually some parameterized functions? If they are closures generated by the same function, their type is identical:

julia> create_adder(value) = (x) -> x + value;

julia> typeof(create_adder(1)) === typeof(create_adder(2))
true

So, you can put them in a vector without invoking run-time dispatch:

julia> callfirst(fs, x) = first(fs)(x);

julia> @code_warntype callfirst([create_adder(1), create_adder(2)], 0)
Variables
  #self#::Core.Const(callfirst, false)
  fs::Vector{var"#1#2"{Int64}}
  x::Int64

Body::Int64
1 ─ %1 = Base.getindex(fs, 1)::var"#1#2"{Int64}
│   %2 = (%1)(x)::Int64
└──      return %2

Even if not all function types are identical, I’d imagine there are only handful of function types. If that’s the case, you can use Iterators.flatten to group closures/functions by their type, e.g., Iterators.flatten(([create_adder(1), create_adder(2)], [create_adder(1im)])).

Unfortunately, Julia’s native for loop is not powerful enough to completely optimize a complex iterator like Iterators.flatten. You’d need to use Base.foldl or some external packages like FLoops.jl (ref [RFC/ANN] FLoops.jl: fast generic for loops (foldl for humans™)) to eliminate dynamic dispatches.

Yes, I think it’d be a better way to parallelize the computation. FWIW, if your graph object already defines Base.iterate (and, preferably, Base.length), you can simply add SplittablesBase.halve to support parallel computations via Transducers.jl, aforementioned FLoops.jl, ThreadsX.jl, etc.


Regarding ThreadPools.jl… I hope I don’t sound like trivializing @tro3’s hard work but I think it’s important to understand that the primary motivation for ThreadPools.jl is to “undo” the design of composable multi-threading in Julia (see Announcing composable multi-threaded parallelism in Julia). IIUC, ThreadPools.jl exists for separating out latency-critical code (executed in the primary thread) from throughput-oriented code (executed in non-primary threads). This is very useful if you are writing, e.g., GUI application but it is not desirable to use it in a library or throughput-oriented user code. ThreadPools.jl is a very clever and useful workaround for the current state of multi-threading in Julia. However, I think it’s a good idea to avoid using it if you mainly care about the “overall speed” (i.e., throughput) of your computation and composability with the rest of the ecosystem.

3 Likes

@tkf is correct (wiping tears away… j/k :slight_smile: ) as to the latency-vs-throughput use case. The other niche case is when your jobs vary widely in size - the queued pool will chew through the fast ones while the slow ones run. For any other usage besides those two niches, the existing Julia infrastructure is superior. I’ll plow through the rest of this thread in a bit. (For some reason, I am no longer getting any emails from Discourse, so I am missing things until days later.)

1 Like

This is actually a common enough case. Like for example you’re running some kind of simulation, and the duration of the simulation is dependent on random number generation, or the stiffness of an ODE is dependent on the initial conditions you give, or something like that, so that some of the instances take much longer than others. I really like ThreadPools because I think this use case is going to be common enough for me.

1 Like