# Multithreading using FLoops for updating discrete distribution

I’m working on a heterogeneous agent macroeconomics model. One of the steps in the solution method for the model is to update the (discrete) wealth distribution across agents. The model is quite big so I need to multithread this in order to get adequate performance.

I have used Floops with great success for other parts of the model but the distribution update has inherent problems with data races that make it more complicated. I can still multithread it but I ended up having to use a `Threads.threadid` technique. I tried doing this with @reduce but wasn’t able to figure out the syntax.

Below is a code snippet showing a greatly simplified version of what I’m trying to do. Wondering if there’s a more efficient way to do this. Or if there’s a way I can use @reduce. My CPU has 8 threads. I found the roughly 2.5x speed-up compared to using a single thread to be quite underwhelming.

First, some setup code:

``````using Floops

nz = 4
ztrans = [0.9463 0.0528 0.0007 0.0002;
0.0264 0.9470 0.0264 0.0002;
0.0007 0.0528 0.9463 0.0002;
0.0069 0.0069 0.0069 0.9793];

nβ = 2
βtrans = [0.9969 0.0031; 0.0031 0.9969];

na = 1000
amax = 200
agrid = range(0, amax, length = na);
aPol = rand(na)*amax;

μ = rand(na,nz,nβ);
μ /= sum(μ)
``````

Now, the single thread version of the update function (no need to worry about data races here):

``````function distupdate!(μ_new,μ,aPol,agrid,nz,ztrans,nβ,βtrans)

for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

if aPol[ia]>=last(agrid)
iaPh = na
iaPl = na
iaPh_frac = 1.0
iaPl_frac = 0.0
elseif aPol[ia] == first(agrid)
iaPh = 1
iaPl = 1
iaPh_frac = 1.0
iaPl_frac = 1.0
else
iaPh = searchsortedfirst(agrid, aPol[ia])
iaPl = iaPh - 1
iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
iaPl_frac = 1.0 - iaPh_frac
end

for iβP in 1:nβ
for izP in 1:nz
@views μ_new[iaPh,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
@views μ_new[iaPl,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
end
end

end

end

``````

Now the multithreaded version, where I create a version of the distribution with another dimension to store the work that each thread has done before summing together to update the distribution:

``````function distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans)

@floop ThreadedEx() for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

if aPol[ia]>=last(agrid)
iaPh = na
iaPl = na
iaPh_frac = 1.0
iaPl_frac = 0.0
elseif aPol[ia] == first(agrid)
iaPh = 1
iaPl = 1
iaPh_frac = 1.0
iaPl_frac = 0.0
else
iaPh = searchsortedfirst(agrid, aPol[ia])
iaPl = iaPh - 1
iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
iaPl_frac = 1.0 - iaPh_frac
end

for iβP in 1:nβ
for izP in 1:nz
end
end

end

end
``````

Benchmarking:

``````distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans);

μ_new2 == μ_new
true

@btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
382.890 ms (7766704 allocations: 138.04 MiB)

161.089 ms (9046948 allocations: 197.26 MiB)
``````

From a quick glance, it looks like you are trying to sum arrays? Maybe you can use the combination of `@init` and `@reduce() do` syntax like this?

``````@floop for (A, B) in zip(As, Bs)
@init C = similar(A)
mul!(C, A, B)
@reduce() do (S = zero(C); C)
S .+= C
end
end
``````

See the full explanation in Efficient and safe approaches to mutation in data parallelism

Untested, but I guess it’d look like

``````@init μ_new_tmp = Array{Float64}(undef, ...)
fill!(μ_new_tmp, 0)
for iβP in 1:nβ
for izP in 1:nz
μ_new_tmp[iaPh,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
μ_new_tmp[iaPl,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
end
end

@reduce() do (μ_new; = zero(...) μ_new_tmp)
μ_new .+= μ_new_tmp
end
``````

There is a discussion for how to avoid `μ_new .+= μ_new_tmp` for extra efficiency in the tutorial above, too.

2 Likes

Turns out I was just out of RAM. It was using a very large size for agrid to magnify the differences between the versions. And because I was running the @reduce version last, it didn’t have access to enough memory.

With the smaller grid, we can see that the @reduce version is more efficient:

``````julia> @btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
31.756 ms (712576 allocations: 12.83 MiB)

14.154 ms (840819 allocations: 18.76 MiB)

julia> @btime(distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
11.169 ms (392371 allocations: 6.98 MiB)
``````

Thank you @tkf for all your help!

Update: I got the above code to run. But it’s significantly slower than my more crude approach using `Threads.threadid()`:

``````julia> @time distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans);
2.153306 seconds (91.13 M allocations: 1.936 GiB, 23.92% gc time)

julia> @time distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans);
613.718691 seconds (39.99 M allocations: 707.934 MiB, 0.06% gc time)
``````

Note that these timings were both were both on 2nd or third runs so no compile time. Using @reduce clearly has may fewer allocations but I don’t understand why it takes so long to run compared `Threads.threadid` version.

Hmm… that’s rather absurdly slow. A couple of general techniques for debugging something like this;

1. Change the input size and see if the amount of allocations changes. If it scales linearly (or polynomially), maybe there are some missing memory optimizations.
2. Pass `SequentialEx()` instead of `ThreadedEx()` to see if you can spot the usual inefficiency pitfalls like type-instability. `SequentialEx` also makes it easy to use Cthulhu.

I’ve been doing more testing and I no longer thing it was a RAM issue. When I edited my last reply, I lost the code snippet so I’ll post it here again. From my testing, the allocations are scaling linearly with the input size, but the computing time is scaling at a dramatically higher rate for the versions that use @reduce. I’ll keep working on it to see if I can find any type instability issues or other stuff like that.

Here’s the @reduce version based on your suggestion:

``````function distupdate_reduce!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans)

@floop for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

if aPol[ia]>=last(agrid)
iaPh = na
iaPl = na
iaPh_frac = 1.0
iaPl_frac = 0.0
elseif aPol[ia] == first(agrid)
iaPh = 1
iaPl = 1
iaPh_frac = 1.0
iaPl_frac = 0.0
else
iaPh = searchsortedfirst(agrid, aPol[ia])
iaPl = iaPh - 1
iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
iaPl_frac = 1.0 - iaPh_frac
end

@init μ_new_tmp = fill(0.0, (na,nz,nβ))

μ_new_tmp[iaPh,:,:] = ztrans[iz,:]*βtrans[iβ,:]'*iaPh_frac*μ[ia,iz,iβ]
μ_new_tmp[iaPl,:,:] = ztrans[iz,:]*βtrans[iβ,:]'*iaPl_frac*μ[ia,iz,iβ]

for iβP in 1:nβ
for izP in 1:nz
μ_new_tmp[iaPh,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
μ_new_tmp[iaPl,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
end
end

@reduce() do (μ_new = zero(μ); μ_new_tmp)

μ_new .+= μ_new_tmp

end

end

end
``````

And I figured out how to use in place multiplication in this version:

``````function distupdate_mul!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans)

@floop for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

if aPol[ia]>=last(agrid)
iaPh = na
iaPl = na
iaPh_frac = 1.0
iaPl_frac = 0.0
elseif aPol[ia] == first(agrid)
iaPh = 1
iaPl = 1
iaPh_frac = 1.0
iaPl_frac = 0.0
else
iaPh = searchsortedfirst(agrid, aPol[ia])
iaPl = iaPh - 1
iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
iaPl_frac = 1.0 - iaPh_frac
end

@init μ_new_tmp = fill(0.0, (na,nz,nβ))

mul!(μ_new_tmp[iaPh,:,:],ztrans[iz,:],βtrans[iβ,:]',iaPh_frac*μ[ia,iz,iβ],1)
mul!(μ_new_tmp[iaPl,:,:],ztrans[iz,:],βtrans[iβ,:]',iaPl_frac*μ[ia,iz,iβ],1)

@reduce() do (μ_new = zero(μ); μ_new_tmp)
mul!(μ_new,μ_new_tmp,1,1,1)
end

end

end
``````

When I set `agrid = 1000`, I get the following results:

``````@btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
32.337 ms (711136 allocations: 12.80 MiB)

14.313 ms (839379 allocations: 18.74 MiB)

@btime(distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
13.162 ms (759827 allocations: 22.35 MiB)

@btime(distupdate_mul!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
11.645 ms (391827 allocations: 12.58 MiB)
``````

Then when I go up to `agrid = 10000`, I get the following:

``````@btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
334.275 ms (7761232 allocations: 137.96 MiB)

159.925 ms (9041477 allocations: 197.18 MiB)

@btime(distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
1.448 s (7956549 allocations: 228.84 MiB)

@btime(distupdate_mul!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
1.354 s (4276549 allocations: 131.19 MiB)
``````

I don’t think you’d want to use `mul!` in your code. The example I quoted uses `mul!` because the original sequential code uses `*`. You’d also need to run `fill!(μ_new_tmp, 0)` outside `@init` (i.e., for every iteration) in `distupdate_reduce!`.

I made that change now. I also noted that `na` was not defined. So I added `na = length(agrid)` at the top to fix that. This doesn’t appear to have fixed the problems though. I also tried using Cthulhu. It seems to flag some type instabilities in the calls to Transducers but I just don’t know enough Julia to understand what the source of the type instability is.

Where did you set `na`? I can imagine that it can cause a type instability if it’s inside of `@floop`. Otherwise, it’s a bit hard to guess what can go wrong.

(BTW, `threadid()`-based solution is likely OK here other than that you may get different result due to the non-associativity of floating point addition.)

It was at the beginning of the function, before the call to `@floop`.

Turns out this is in fact a problem for my application. When I applied the `threadid()` approach to my actual (much more complicated) application, the iteration (e.g., a single run of the above `distupdate` function) was much faster than using a single thread. But it took far more iterations to converge and I suspect this was because the order of the summation was changing, slowing down the convergence.

Does the JuliaFolds ecosystem avoid this problem?

If so, where can I read more about the basic algorithms? I’ve been pouring over the documentation but I couldn’t find much about the basic algorithms to parallelize a fold …only how to use the code bases.

I guess I’m wondering if it might be possible to sort of “roll my own” approach to this.

Yeah, JuliaFolds APIs are meant to produce deterministic results by default if you use the same `nthreads` (or manually specify `basesize`). There are also options to weaken this if you need the last bit of performance.

The main “algorithm” is stupid-simple Transducers.jl/reduce.jl at ed41897d3078f0ae622b271380e426b30d34ea8b · JuliaFolds/Transducers.jl · GitHub