# Automatically fusing together several for-loops

I’m trying to speed up an algorithm that computes various graph statistics. The relevant part of the code is:

``````struct erg{G <: AbstractArray{Bool},F <: Function} <: AbstractGraph{Int}
m::G
fs::Vector{F}
trans::G
realnodecov::Union{Dict{String,Array{Float64,1}},Nothing}
catnodecov::Union{Dict{String,Array{Any,1}},Nothing}
edgecov::Union{Array{Array{Float64,2}},Nothing}
indegree::Vector{Int64}
outdegree::Vector{Int64}
n_funcs::Int
kstars_precomputed::Array{Float64,2}
end

function change_scores(g::E, i, j) where {E <: AbstractGraph}
x = zeros(g.n_funcs)
func_list = g.fs
for k = 1:g.n_funcs
x[k] = func_list[k](g, i, j)
end
return x
end
``````

In general, the different graph statistics (represented by a vector of functions `g.fs`) can be anything; in practice, many of them are either O(1) functions such as:

``````
# Toggle and edge and get the changes in total number of edges; really basic, but useful.
function delta_edge(g::E, s, r) where {E <: AbstractGraph}
if !has_edge(g, s, r)
return 1.0
else
return -1.0
end
end

# Toggle edge from s -> r, and get changes in count of reciprocal dyads
function delta_mutual(g::E, s, r) where {E <: AbstractGraph}
if !g.trans[s, r]
return 0.0
elseif !g.m[s, r]
return 1.0
else
return -1.0
end
end
``````

or contain some sort of loop over the vertices of the graph, such as

``````function delta_ttriple(g::E, s, r) where {E <: AbstractGraph}
x = 0
@inbounds for i in vertices(g)
x +=
(g.trans[i, r] * g.trans[i, s]) +
(g.m[i, r] * g.trans[i, s]) +
(g.m[i, r] * g.m[i, s])
end
if !has_edge(g, s, r)
return convert(Float64, x)
else
return -convert(Float64, x)
end
end
``````

The functions don’t mutate the graph, and can be computed in parallel (though I was not able to get any speedups from doing so). Is there a way to “annotate” functions of the second kind (the ones that loop over the vertices of the graph) in a way that would cause their for-loops to be “fused”? I’m not sure if this is the exact terminology - my goal is improve performance by reducing the number of passes over the vertices of the graph (ideally to a single pass, even if there are several functions of the second kind in `g.fs`).

Any other performance tips would be appreciated, as well - the `change_scores` function is the computational bottleneck of the algorithm, so even a small speedup is important.

What have you tried in terms of parallelization?

For the loops maybe you could try

It sounds like your usecase could work with that (see warnings in the repo)

for the `delta_ttriple` function you could also check out

which I think is partly based on the package above.

Another great package in thta direction is

Hope that helps!

This isn’t going to work: unless the elements of `fs` are literally all the same function, `F` will have to be an abstract type (`Function`), and hence calling the functions will be relatively slow. To get a concretely typed list of functions, you would need something like a tuple of functions here.

2 Likes

The signature of the functions in `fs` is always `(g::E, s::Int, r::Int)`, and the return type is always Float. Is there a way to use this information to speed up `change_scores`?

No.

2 Likes

It should be possible to get speed ups from parallelism. How did you try doing this before?

I mean technically yes, using `@cfunction`, though I get that it probably shouldn’t be recommended in general.

``````foo(a, b) = a + b
bar(a, b) = a - b

funcs = [foo, bar]
cfuncs = [
@cfunction(foo, Cdouble, (Ref{Cdouble}, Ref{Cdouble})),
@cfunction(bar, Cdouble, (Ref{Cdouble}, Ref{Cdouble}))
]

function sum_funcs(funcs, a, b)
ret = 0.0
for func in funcs
ret += func(a, b)
end
ret
end

function sum_cfuncs(cfuncs, a, b)
ret = 0.0
for func in cfuncs
ret += ccall(func, Cdouble, (Ref{Cdouble}, Ref{Cdouble}), a, b)
end
ret
end

using BenchmarkTools
@btime sum_funcs(\$funcs, a, b) setup = (a = rand(); b = rand()) # 88.750 ns (8 allocations: 128 bytes)
@btime sum_cfuncs(\$cfuncs, a, b) setup = (a = rand(); b = rand()) # 13.612 ns (0 allocations: 0 bytes)
``````
2 Likes

I think there is. Not exactly that they have methods with the same signature, but that you know the signatures (edit: the concrete types of the functions) upfront.

I do something similar in Plugins.jl, it can merge methods defined in an array into a single method body.

It was not designed for this case, as it works on plugin types which implement methods for the same function, not separated functions. So you have to create a plugin from every function, but then it should run much faster than with cfunction.

I doubt you can automatically roll these loops together, but you could define a “foreachnode” function (or maybe there already is one?) which then takes a tuple of collector functions and iterates once over the graph, handing the nodes to your collector functions… it’d then return a tuple of results, one for each collector function.

1 Like

Besides using `@cfunction` as pointed out by @tkoolen, you can use `FunctionWrappers.jl`.

``````using BenchmarkTools
using StaticArrays
using FunctionWrappers: FunctionWrapper

foo(a, b, c) = a .+ b .- c
bar(a, b, c) = a .- b .+ c

function sum_funcs(funcs, a, b, c)
r = zeros(SVector{3})
for func in funcs
r += func(a, b, c)
end
return r
end

Signature = FunctionWrapper{
SVector{3, Float64},
Tuple{SVector{3, Float64}, Float64, Float64}
}

fs = [foo, bar]
wfs = Signature[foo, bar]

@btime sum_funcs(\$fs, a, b, c) setup = (a = rand(SVector{3}); b = rand(); c = rand())
# 74.816 ns (10 allocations: 256 bytes)
@btime sum_funcs(\$wfs, a, b, c) setup = (a = rand(SVector{3}); b = rand(); c = rand())
# 13.896 ns (0 allocations: 0 bytes)
``````
6 Likes

Thanks for all the helpful replies!

I’ve tried `Threads.@threads`, `@sync` and `@distributed`, as well as `pmap`; all ended up being considerably slower. It’s quite likely I’ve been using them too naively - any tips would be much appreciated! I think one of the issues here is that `change_scores` is relatively fast as it is - roughly 1μs, for the size of graphs I’m interested in; the problem is that I’m calling it many many times from a function which can’t be parallelised (it mutates the graph after each call to `change_scores`).

I’ve tried using Tullio in `delta_ttriple` (as well as replacing the for loop with dot products, etc) and it actually yielded slightly worse performance, not sure why. And Transducers seems very interesting, but I’m not sure I understand the benefits - should decomposing the computation in this way (assuming it’s possible, I’ll have to try) yield any performance improvements?

This gave me 20%-30% speedup with basically zero extra work - that’s incredible, thank you very much! By the way, is there any specific reason to use SVectors here? I’ve tried storing the graph’s adjacency matrix as an MMatrix (since it’s mutated by the MCMC algorithm), but these seem to be inadequate for graphs of the size I’m interested in (~200 nodes).

This is not without some work, as Plugins.jl thinks in an overly structured way, but seems like 5x improvement over FunctionWrappers in the summer sample.

``````  | | |_| | | | (_| |  |  Version 1.5.1 (2020-08-25)
_/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using BenchmarkTools

julia> using StaticArrays

julia> using Plugins

julia> foo(a, b, c) = a .+ b .- c
foo (generic function with 1 method)

julia> bar(a, b, c) = a .- b .+ c
bar (generic function with 1 method)

julia> function calc end
calc (generic function with 0 methods)

julia> mutable struct Foo <: Plugin
result::SVector{3, Float64}
Foo() = new(zeros(SVector{3}))
end

julia> mutable struct Bar <: Plugin
result::SVector{3, Float64}
Bar() = new(zeros(SVector{3}))
end

julia> struct Results end # Shared state, not used

julia> calc(pfoo::Foo, result, a, b, c) = pfoo.result = foo(a, b, c)
calc (generic function with 1 method)

julia> calc(pbar::Bar, result, a, b, c) = pbar.result = bar(a, b, c)
calc (generic function with 2 methods)

julia> const pfoo = Foo()
Foo([0.0, 0.0, 0.0])

julia> const pbar = Bar()
Bar([0.0, 0.0, 0.0])

julia> calculators = PluginStack([pfoo, pbar])
PluginStack(Plugin[Foo([0.0, 0.0, 0.0]), Bar([0.0, 0.0, 0.0])], Any[], Dict{Symbol,Plugin}(:nothing => Bar([0.0, 0.0, 0.0])), nothing)

julia> const results = Results()
Results()

julia> const mergedcalc = hooklist(calculators, calc, results)
Plugins.HookList{Plugins.HookList{Plugins.HookList{Nothing,Nothing,Nothing,Nothing},typeof(calc),Bar,Results},typeof(calc),Foo,Results}(Plugins.HookList{Plugins.HookList{Nothing,Nothing,Nothing,Nothing},typeof(calc),Bar,Results}(Plugins.HookList{Nothing,Nothing,Nothing,Nothing}(nothing, nothing, nothing, nothing), calc, Bar([0.0, 0.0, 0.0]), Results()), calc, Foo([0.0, 0.0, 0.0]), Results())

julia> @btime begin mergedcalc(a, b, c); zeros(SVector{3}) + pfoo.result + pbar.result; end setup = (a = rand(SVector{3}); b = rand(); c = rand())
2.157 ns (0 allocations: 0 bytes)
3-element SArray{Tuple{3},Float64,1,3} with indices SOneTo(3):
0.12151616008457733
1.8758013798301496
0.21778100588128257
``````

edit1: I remember seeing a package that was exactly for merging functions, but cannot find it now.
edit2: Edited the code to collect the results in plugin states, not in shared state

1 Like

Yes, if calling change_scores is 1μs I imagine the overhead of threading is going to get in the way here.

It’s still worth a try though. I think you could try out ThreadPools: https://tro3.github.io/ThreadPools.jl/build/index.html with the `@qthreads` macro.

``````function change_scores(g::E, i, j) where {E <: AbstractGraph}
x = zeros(g.n_funcs)
func_list = g.fs
x[k] = func_list[k](g, i, j)
end
return x
end
``````

Since there is a lot of variation in the speed of these functions, this will work generally better than `@threads` which is designed mainly for the case where each iteration takes a similar amount of time.

Also, not sure what the “outer” algorithm is. But if it’s an MCMC type procedure as you mention, you could of course run multiple chains on separate threads.

Also, something like this might make sense.

``````x = ThreadPools.qmap(f -> f(g,i,j),func_list)
``````

What’s the typical `length` of `g.fs`? Why not use a tuple like

``````struct erg{G <: AbstractArray{Bool}, Functions <: Tuple} <: AbstractGraph{Int}
m::G
fs::Functions
...
``````

as mentioned by other comments? Are they completely different functions or some parametrized callable of the same type (e.g., closure)?

Is it possible that this is essentially the cost of dynamic dispatch? I think profiling `change_scores` could help.

Also, 1μs is rather too short for using threads. It takes a few μs to start a task in current `julia`. So, you’d need to chunk more computations to get a good speedup.

The length of `fs` can vary between a couple of functions, to 100-200 at most. All functions have the same signature, and the same type of their return value (see below). What would be the pros/cons of using a tuple instead of an array of functions?

I’ve tried to measure this using Profile/ProfileView but couldn’t generate the kind of summary I was looking for (basically % of time spent in each of `g.fs`, as well as a break-down of different overhead times). Is there a way to generate such a summary? I’d be happy to share the jlprof file if it helps.

Chunking seemed like a good option to me, as well; but I wasn’t able to get it to work:

1. For a 100-nodes graph and ~10 functions in `g.fs`, `change_scores` currently takes roughly 0.5-1μs, using @pabloferz excellent tip. Still haven’t implemented @tisztamo Plugins approach, but it seems super relevant, as well.
2. `subgraphcounts` iterates over the edges of the graph; for each existing edge, it calls `change_scores`, accumulates the result, and then removes the edge. This can theoretically be fully parallelised (by calling `change_scores` separately for each edge, and within each call, delete all the edges before it). However, since this function acts on a copy of the original graph, it has some copying overhead, so indeed chunking seems reasonable. For a 100 nodes graph, the sequential version takes 350-500μs, while a chunked version that operates on 1/10th of the graph takes ~70μs. However, accumulating the results over different chunks using `qthreads` (with `Threads.nthreads()=8`) takes ~3ms, and returns non-deterministic results, so I’m obviously doing something wrong. Using `ThreadPools.qmap` instead of `qthreads` solves the non-determinism, but is also considerably slower than doing everything sequentially.

Here are the original functions:

``````function change_scores(g::E, i, j) where {E <: AbstractGraph}
x = zeros(g.n_funcs)
#     x = ThreadPools.qmap(f -> f(g,i,j),g.fs)
#     @qthreads for k in 1:g.n_funcs
#         x[k] = g.fs[k](g,i,j)
#     end
for k in 1:g.n_funcs
x[k] = g.fs[k](g,i,j)
end
return x
end

function subgraphcount(g::E) where {E <: AbstractGraph}
x = zeros(g.n_funcs)
g2 = deepcopy(g)

for j in vertices(g2)
for i in vertices(g2)
if i == j
continue
elseif !has_edge(g2, i, j)
continue
else
x -= change_scores(g2, i, j)
edge_toggle!(g2, i, j)
end
end
end
return x
end
``````

Here’s the chunked version of `subgraphcounts` and the `qthreads` function that calls it:

``````function ch_subgraphcount(g::E, start_col, end_col) where {E <: AbstractGraph}
x = zeros(g.n_funcs)
g2 = deepcopy(g)
if start_col > 1
g2.m[:,1:(start_col-1)].=0
g2.trans[1:(start_col-1),:].=0
g2.indegree[:] = vec(sum(g2.m, dims=1))
g2.outdegree[:] = vec(sum(g2.m, dims=2))
end
for j in start_col:(end_col-1)
for i in vertices(g2)
if i == j
continue
elseif !has_edge(g2, i, j)
continue
else
x -= change_scores(g2, i, j)
edge_toggle!(g2, i, j)
end
end
end
return x
end

function p_subgraphcount(g::E, chunksize) where {E <: AbstractGraph}
n = nv(g)
if n % chunksize != 0
throw("chunksize in not a whole multiple of graph size")
end
x = zeros(g.n_funcs)
x += ch_subgraphcount(g, 1+(i-1)*chunksize, 1+i*chunksize)
end
return x
#return sum(ThreadPools.qmap(i -> ch_subgraphcount(g, 1+(i-1)*chunksize, 1+i*chunksize), 1:(n÷chunksize)))
end
``````

This is the `erg` data structure:

``````struct erg{G <: AbstractArray{Bool}} <: AbstractGraph{Int}
m::G
fs::Array{FunctionWrapper{Float64,Tuple{erg,Int64,Int64}},1}
trans::G
realnodecov::Union{Dict{String,Array{Float64,1}},Nothing}
catnodecov::Union{Dict{String,Array{Any,1}},Nothing}
edgecov::Union{Array{Array{Float64,2}},Nothing}
indegree::Vector{Int64}
outdegree::Vector{Int64}
n_funcs::Int
kstars_precomputed::Array{Float64,2}
end

Signature = FunctionWrapper{
Float64,
Tuple{erg, Int, Int}
}
``````

I’ve put all the `delta_foo` functions in this gist to avoid unnecessary clutter, in case anyone’s interested.

Yes, `subgraphcounts` is used to compute the MH transition probability; I’m currently trying to optimise it (and the lower-level `change_scores`), but parallelising the higher-level MCMC function using mutlithreading is definitely the next step!

I got excited from this topic and created a micropackage while my son is sleeping

``````  | | |_| | | | (_| |  |  Version 1.5.1 (2020-08-25)
_/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using FunctionWranglers

julia> create_adder(value) = (x) -> x + value
create_adder (generic function with 1 method)

5-element Array{var"#1#2"{Int64},1}:
#1 (generic function with 1 method)
#1 (generic function with 1 method)
#1 (generic function with 1 method)
#1 (generic function with 1 method)
#1 (generic function with 1 method)

FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}}}(var"#1#2"{Int64}(1), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}}(var"#1#2"{Int64}(2), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}}(var"#1#2"{Int64}(3), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}}(var"#1#2"{Int64}(4), FunctionWrangler{var"#1#2"{Int64},FunctionWrangler{Nothing,Nothing}}(var"#1#2"{Int64}(5), FunctionWrangler{Nothing,Nothing}(nothing, nothing))))))

5-element Array{Float64,1}:
0.0
0.0
0.0
0.0
0.0

julia> smap!(result, w, 10.0)

julia> result
5-element Array{Float64,1}:
11.0
12.0
13.0
14.0
15.0

julia> @btime smap!(\$result, \$w, d) setup = (d = rand())
2.892 ns (0 allocations: 0 bytes)
``````

It works well for 200 functions (<1s compile time at the first call), and seems to be usable up to at least 1000 functions.

5 Likes

Your approach got me excited as well!
I think you could get the same functionality by wrapping `Tuple`s and using tail recursion. Using the tuples directly, you can do

``````apply_f_tuple(ft::Tuple{}, args...) = ()
apply_f_tuple(ft::Tuple, args...) = (first(ft)(args...), apply_f_tuple(Base.tail(ft), args...)...)
``````
``````julia> adders = ntuple(i->create_adder(i), 5);

(11.0, 12.0, 13.0, 14.0, 15.0)
``````

The advantage here is that you get a tuple back, which is some cases can be desirable (also matches up with how `map` works on tuples, which is nice)

For larger tuples though, your `smap!` api makes more sense (returning long tuples isn’t generally a good idea…). You can do this with tuples also

``````_smap!(ft::Tuple{}, dest, i, args...) = dest

function _smap!(ft::Tuple, dest, i, args...)
dest[i] = first(ft)(args...)
_smap!(Base.tail(ft), dest, i+1, args...)
end

smap!(ft::Tuple, dest, args...) = _smap!(ft, dest, firstindex(dest), args...)
``````

And my favorite part is you can infer the narrowest return type and allocate the container based on that.

``````_container_type(ft::Tuple, args...) = mapreduce(f -> Base._return_type(f, typeof.(args)), typejoin, ft)

function smap(ft::Tuple, args...)
CT = _container_type(ft, args...)
dest = Vector{CT}(undef, length(ft))
smap!(ft, dest, args...)
end
``````

However, taking either of our approaches I see a stack overflow being printed from the compiler for very large tuples/FunctionWranglers (~1000) (the output is actually still correct despite the error). I think that might just be par for the course when dealing with huge tuple/nested types though.

Edit: Turns out `apply_f_tuple` is literally just `map` (for tuples) under a different name…

1 Like

If it is 100-200 functions, I don’t think using a tuple makes sense. I’d use a tuple if there are only “handful” of elements (let’s say < 16, as that’s the heuristics `Base` uses; but the compiler can handle more).

Note that there is no way to express “signature” (input types and output type) in Julia’s type system. That’s why you need the `ccall` hack to get some decent performance.

This is why I asked if you have closures or some callable objects. That is to say, do you have 100 functions with completely different implementations? Or, are they actually some parameterized functions? If they are closures generated by the same function, their type is identical:

``````julia> create_adder(value) = (x) -> x + value;

true
``````

So, you can put them in a vector without invoking run-time dispatch:

``````julia> callfirst(fs, x) = first(fs)(x);

Variables
#self#::Core.Const(callfirst, false)
fs::Vector{var"#1#2"{Int64}}
x::Int64

Body::Int64
1 ─ %1 = Base.getindex(fs, 1)::var"#1#2"{Int64}
│   %2 = (%1)(x)::Int64
└──      return %2
``````

Even if not all function types are identical, I’d imagine there are only handful of function types. If that’s the case, you can use `Iterators.flatten` to group closures/functions by their type, e.g., `Iterators.flatten(([create_adder(1), create_adder(2)], [create_adder(1im)]))`.

Unfortunately, Julia’s native `for` loop is not powerful enough to completely optimize a complex iterator like `Iterators.flatten`. You’d need to use `Base.foldl` or some external packages like FLoops.jl (ref [RFC/ANN] FLoops.jl: fast generic for loops (foldl for humans™)) to eliminate dynamic dispatches.

Yes, I think it’d be a better way to parallelize the computation. FWIW, if your graph object already defines `Base.iterate` (and, preferably, `Base.length`), you can simply add `SplittablesBase.halve` to support parallel computations via Transducers.jl, aforementioned FLoops.jl, ThreadsX.jl, etc.