I’m working on a heterogeneous agent macroeconomics model. One of the steps in the solution method for the model is to update the (discrete) wealth distribution across agents. The model is quite big so I need to multithread this in order to get adequate performance.
I have used Floops with great success for other parts of the model but the distribution update has inherent problems with data races that make it more complicated. I can still multithread it but I ended up having to use a Threads.threadid
technique. I tried doing this with @reduce but wasn’t able to figure out the syntax.
Below is a code snippet showing a greatly simplified version of what I’m trying to do. Wondering if there’s a more efficient way to do this. Or if there’s a way I can use @reduce. My CPU has 8 threads. I found the roughly 2.5x speed-up compared to using a single thread to be quite underwhelming.
First, some setup code:
using Floops
nz = 4
ztrans = [0.9463 0.0528 0.0007 0.0002;
0.0264 0.9470 0.0264 0.0002;
0.0007 0.0528 0.9463 0.0002;
0.0069 0.0069 0.0069 0.9793];
nβ = 2
βtrans = [0.9969 0.0031; 0.0031 0.9969];
na = 1000
amax = 200
agrid = range(0, amax, length = na);
aPol = rand(na)*amax;
μ = rand(na,nz,nβ);
μ /= sum(μ)
Now, the single thread version of the update function (no need to worry about data races here):
function distupdate!(μ_new,μ,aPol,agrid,nz,ztrans,nβ,βtrans)
for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ
if aPol[ia]>=last(agrid)
iaPh = na
iaPl = na
iaPh_frac = 1.0
iaPl_frac = 0.0
elseif aPol[ia] == first(agrid)
iaPh = 1
iaPl = 1
iaPh_frac = 1.0
iaPl_frac = 1.0
else
iaPh = searchsortedfirst(agrid, aPol[ia])
iaPl = iaPh - 1
iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
iaPl_frac = 1.0 - iaPh_frac
end
for iβP in 1:nβ
for izP in 1:nz
@views μ_new[iaPh,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
@views μ_new[iaPl,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
end
end
end
end
Now the multithreaded version, where I create a version of the distribution with another dimension to store the work that each thread has done before summing together to update the distribution:
function distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans)
@floop ThreadedEx() for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ
if aPol[ia]>=last(agrid)
iaPh = na
iaPl = na
iaPh_frac = 1.0
iaPl_frac = 0.0
elseif aPol[ia] == first(agrid)
iaPh = 1
iaPl = 1
iaPh_frac = 1.0
iaPl_frac = 0.0
else
iaPh = searchsortedfirst(agrid, aPol[ia])
iaPl = iaPh - 1
iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
iaPl_frac = 1.0 - iaPh_frac
end
for iβP in 1:nβ
for izP in 1:nz
μ_new_thread[iaPh,izP,iβP,Threads.threadid()] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
μ_new_thread[iaPl,izP,iβP,Threads.threadid()] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
end
end
end
μ_new2[:,:,:] = sum(μ_new_thread,dims=4)
end
Benchmarking:
distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans);
distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans);
μ_new2 == μ_new
true
@btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
382.890 ms (7766704 allocations: 138.04 MiB)
@btime(distupdate_thread!(μ_new, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
161.089 ms (9046948 allocations: 197.26 MiB)