Multithreading using FLoops for updating discrete distribution

I’m working on a heterogeneous agent macroeconomics model. One of the steps in the solution method for the model is to update the (discrete) wealth distribution across agents. The model is quite big so I need to multithread this in order to get adequate performance.

I have used Floops with great success for other parts of the model but the distribution update has inherent problems with data races that make it more complicated. I can still multithread it but I ended up having to use a Threads.threadid technique. I tried doing this with @reduce but wasn’t able to figure out the syntax.

Below is a code snippet showing a greatly simplified version of what I’m trying to do. Wondering if there’s a more efficient way to do this. Or if there’s a way I can use @reduce. My CPU has 8 threads. I found the roughly 2.5x speed-up compared to using a single thread to be quite underwhelming.

First, some setup code:

using Floops

nz = 4
ztrans = [0.9463 0.0528 0.0007 0.0002; 
          0.0264 0.9470 0.0264 0.0002; 
          0.0007 0.0528 0.9463 0.0002; 
          0.0069 0.0069 0.0069 0.9793];

nβ = 2
βtrans = [0.9969 0.0031; 0.0031 0.9969];

na = 1000
amax = 200
agrid = range(0, amax, length = na);
aPol = rand(na)*amax;

μ = rand(na,nz,nβ);
μ /= sum(μ)

Now, the single thread version of the update function (no need to worry about data races here):

function distupdate!(μ_new,μ,aPol,agrid,nz,ztrans,nβ,βtrans)

    for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

            if aPol[ia]>=last(agrid)
                iaPh = na
                iaPl = na
                iaPh_frac = 1.0
                iaPl_frac = 0.0
            elseif aPol[ia] == first(agrid)
                iaPh = 1
                iaPl = 1
                iaPh_frac = 1.0
                iaPl_frac = 1.0
            else
                iaPh = searchsortedfirst(agrid, aPol[ia])
                iaPl = iaPh - 1
                iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
                iaPl_frac = 1.0 - iaPh_frac
            end

            for iβP in 1:nβ
                for izP in 1:nz
                    @views μ_new[iaPh,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
                    @views μ_new[iaPl,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
                end
            end
        
    end

end

Now the multithreaded version, where I create a version of the distribution with another dimension to store the work that each thread has done before summing together to update the distribution:

function distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans)

    @floop ThreadedEx() for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

            if aPol[ia]>=last(agrid)
                iaPh = na
                iaPl = na
                iaPh_frac = 1.0
                iaPl_frac = 0.0
            elseif aPol[ia] == first(agrid)
                iaPh = 1
                iaPl = 1
                iaPh_frac = 1.0
                iaPl_frac = 0.0
            else
                iaPh = searchsortedfirst(agrid, aPol[ia])
                iaPl = iaPh - 1
                iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
                iaPl_frac = 1.0 - iaPh_frac
            end

            for iβP in 1:nβ
                for izP in 1:nz
                    μ_new_thread[iaPh,izP,iβP,Threads.threadid()] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
                    μ_new_thread[iaPl,izP,iβP,Threads.threadid()] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
                end
            end
        
    end
    
    μ_new2[:,:,:] = sum(μ_new_thread,dims=4)

end

Benchmarking:

distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans);

distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans);

μ_new2 == μ_new
true

@btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
  382.890 ms (7766704 allocations: 138.04 MiB)

@btime(distupdate_thread!(μ_new, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
  161.089 ms (9046948 allocations: 197.26 MiB)

From a quick glance, it looks like you are trying to sum arrays? Maybe you can use the combination of @init and @reduce() do syntax like this?

@floop for (A, B) in zip(As, Bs)
    @init C = similar(A)
    mul!(C, A, B)
    @reduce() do (S = zero(C); C)
        S .+= C
    end
end

See the full explanation in Efficient and safe approaches to mutation in data parallelism

Untested, but I guess it’d look like

@init μ_new_tmp = Array{Float64}(undef, ...)
fill!(μ_new_tmp, 0)
for iβP in 1:nβ
    for izP in 1:nz
        μ_new_tmp[iaPh,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
        μ_new_tmp[iaPl,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
    end
end

@reduce() do (μ_new; = zero(...) μ_new_tmp)
    μ_new .+= μ_new_tmp
end

There is a discussion for how to avoid μ_new .+= μ_new_tmp for extra efficiency in the tutorial above, too.

2 Likes

Turns out I was just out of RAM. It was using a very large size for agrid to magnify the differences between the versions. And because I was running the @reduce version last, it didn’t have access to enough memory.

With the smaller grid, we can see that the @reduce version is more efficient:

julia> @btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
  31.756 ms (712576 allocations: 12.83 MiB)

julia> @btime(distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
  14.154 ms (840819 allocations: 18.76 MiB)

julia> @btime(distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
  11.169 ms (392371 allocations: 6.98 MiB)

Thank you @tkf for all your help!

Update: I got the above code to run. But it’s significantly slower than my more crude approach using Threads.threadid():

julia> @time distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans);
  2.153306 seconds (91.13 M allocations: 1.936 GiB, 23.92% gc time)

julia> @time distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans);
613.718691 seconds (39.99 M allocations: 707.934 MiB, 0.06% gc time)

Note that these timings were both were both on 2nd or third runs so no compile time. Using @reduce clearly has may fewer allocations but I don’t understand why it takes so long to run compared Threads.threadid version.

Hmm… that’s rather absurdly slow. A couple of general techniques for debugging something like this;

  1. Change the input size and see if the amount of allocations changes. If it scales linearly (or polynomially), maybe there are some missing memory optimizations.
  2. Pass SequentialEx() instead of ThreadedEx() to see if you can spot the usual inefficiency pitfalls like type-instability. SequentialEx also makes it easy to use Cthulhu.

I’ve been doing more testing and I no longer thing it was a RAM issue. When I edited my last reply, I lost the code snippet so I’ll post it here again. From my testing, the allocations are scaling linearly with the input size, but the computing time is scaling at a dramatically higher rate for the versions that use @reduce. I’ll keep working on it to see if I can find any type instability issues or other stuff like that.

Here’s the @reduce version based on your suggestion:

function distupdate_reduce!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans)

    @floop for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

        if aPol[ia]>=last(agrid)
            iaPh = na
            iaPl = na
            iaPh_frac = 1.0
            iaPl_frac = 0.0
        elseif aPol[ia] == first(agrid)
            iaPh = 1
            iaPl = 1
            iaPh_frac = 1.0
            iaPl_frac = 0.0
        else
            iaPh = searchsortedfirst(agrid, aPol[ia])
            iaPl = iaPh - 1
            iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
            iaPl_frac = 1.0 - iaPh_frac
        end

        @init μ_new_tmp = fill(0.0, (na,nz,nβ))

        μ_new_tmp[iaPh,:,:] = ztrans[iz,:]*βtrans[iβ,:]'*iaPh_frac*μ[ia,iz,iβ]
        μ_new_tmp[iaPl,:,:] = ztrans[iz,:]*βtrans[iβ,:]'*iaPl_frac*μ[ia,iz,iβ]

        
        for iβP in 1:nβ
            for izP in 1:nz
                μ_new_tmp[iaPh,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPh_frac*μ[ia,iz,iβ]
                μ_new_tmp[iaPl,izP,iβP] += ztrans[iz,izP]*βtrans[iβ,iβP]*iaPl_frac*μ[ia,iz,iβ]
            end
        end
        

        @reduce() do (μ_new = zero(μ); μ_new_tmp)

            μ_new .+= μ_new_tmp

        end
  
    end

end

And I figured out how to use in place multiplication in this version:

function distupdate_mul!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans)

    @floop for ia in eachindex(agrid), iz in 1:nz, iβ in 1:nβ

        if aPol[ia]>=last(agrid)
            iaPh = na
            iaPl = na
            iaPh_frac = 1.0
            iaPl_frac = 0.0
        elseif aPol[ia] == first(agrid)
            iaPh = 1
            iaPl = 1
            iaPh_frac = 1.0
            iaPl_frac = 0.0
        else
            iaPh = searchsortedfirst(agrid, aPol[ia])
            iaPl = iaPh - 1
            iaPh_frac = 1.0 - (agrid[iaPh] - aPol[ia])/(agrid[iaPh] - agrid[iaPl])
            iaPl_frac = 1.0 - iaPh_frac
        end

        @init μ_new_tmp = fill(0.0, (na,nz,nβ))

        mul!(μ_new_tmp[iaPh,:,:],ztrans[iz,:],βtrans[iβ,:]',iaPh_frac*μ[ia,iz,iβ],1)
        mul!(μ_new_tmp[iaPl,:,:],ztrans[iz,:],βtrans[iβ,:]',iaPl_frac*μ[ia,iz,iβ],1)

        @reduce() do (μ_new = zero(μ); μ_new_tmp)
            mul!(μ_new,μ_new_tmp,1,1,1)
        end
  
    end

end

When I set agrid = 1000, I get the following results:

@btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
 32.337 ms (711136 allocations: 12.80 MiB)

@btime(distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
14.313 ms (839379 allocations: 18.74 MiB)

@btime(distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
13.162 ms (759827 allocations: 22.35 MiB)

@btime(distupdate_mul!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
11.645 ms (391827 allocations: 12.58 MiB)

Then when I go up to agrid = 10000, I get the following:

@btime(distupdate!(μ_new, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
334.275 ms (7761232 allocations: 137.96 MiB)

@btime(distupdate_thread!(μ_new2, μ_new_thread, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
159.925 ms (9041477 allocations: 197.18 MiB)

@btime(distupdate_reduce!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
1.448 s (7956549 allocations: 228.84 MiB)

@btime(distupdate_mul!(μ_new3, μ, aPol, agrid, nz, ztrans, nβ, βtrans));
1.354 s (4276549 allocations: 131.19 MiB)

I don’t think you’d want to use mul! in your code. The example I quoted uses mul! because the original sequential code uses *. You’d also need to run fill!(μ_new_tmp, 0) outside @init (i.e., for every iteration) in distupdate_reduce!.

I made that change now. I also noted that na was not defined. So I added na = length(agrid) at the top to fix that. This doesn’t appear to have fixed the problems though. I also tried using Cthulhu. It seems to flag some type instabilities in the calls to Transducers but I just don’t know enough Julia to understand what the source of the type instability is.

Where did you set na? I can imagine that it can cause a type instability if it’s inside of @floop. Otherwise, it’s a bit hard to guess what can go wrong.

(BTW, threadid()-based solution is likely OK here other than that you may get different result due to the non-associativity of floating point addition.)

It was at the beginning of the function, before the call to @floop.

Turns out this is in fact a problem for my application. When I applied the threadid() approach to my actual (much more complicated) application, the iteration (e.g., a single run of the above distupdate function) was much faster than using a single thread. But it took far more iterations to converge and I suspect this was because the order of the summation was changing, slowing down the convergence.

Does the JuliaFolds ecosystem avoid this problem?

If so, where can I read more about the basic algorithms? I’ve been pouring over the documentation but I couldn’t find much about the basic algorithms to parallelize a fold …only how to use the code bases.

I guess I’m wondering if it might be possible to sort of “roll my own” approach to this.

Yeah, JuliaFolds APIs are meant to produce deterministic results by default if you use the same nthreads (or manually specify basesize). There are also options to weaken this if you need the last bit of performance.

The main “algorithm” is stupid-simple https://github.com/JuliaFolds/Transducers.jl/blob/ed41897d3078f0ae622b271380e426b30d34ea8b/src/reduce.jl#L165-L186