uses a buffer for a subset of a cohort of agents (in the MWE below, each has different \kappa),
steps this forward from a given value in time,
calculates some aggregate at each point in time.
Below is a much-simplified but self-contained MWE. The key functions are step_cohort_buffer! and aggregate_cohort_buffer, which stand in for much more complex calculations (for economists: this is an OG model).
For the actual problem I have 24 subgroups for each cohort, in the MWE I have two (κs). I would like to step these in parallel.
I am at loss how to do this nicely. Eg would simply using a ConcurrentDict be sufficient? What part should I wrap with locks? Or should I use atomic constructs? (You can tell that I am clueless about atomics and threads and all that stuff).
## take this part as given, implementing primitives
using LogExpFunctions
const MAX_AGE = 10
mutable struct CohortBuffer{T}
age::Int
t::Int
values::Vector{T}
κ::T
end
function make_cohort_buffer(t, κ; N = 10)
CohortBuffer(1, t, rand(N), κ)
end
function step_cohort_buffer!(buffer::CohortBuffer)
(; values, age, κ) = buffer
@assert age ≤ MAX_AGE
for i in eachindex(values)
# stand-in for a much more complex calculation
buffer.values[i] = logaddexp(buffer.values[i], κ / age)
end
buffer.age += 1
buffer.t += 1
buffer
end
struct Aggregate{T}
x::T
c::Int
end
aggregate_cohort_buffer(buffer) = Aggregate(sum(buffer.values), 1)
Base.:+(a::Aggregate, b::Aggregate) = Aggregate(a.x + b.x, a.c + b.c)
# I need help parallelizing this part safely
function calculation(ts, κs)
result = Dict{Int,Aggregate{Float64}}()
for t in ts
# this I would like to parallelize
for κ in κs
buffer = make_cohort_buffer(t, κ)
while buffer.age ≤ MAX_AGE
step_cohort_buffer!(buffer)
aggr = aggregate_cohort_buffer(buffer)
τ = buffer.t
if haskey(result, τ)
result[τ] += aggr
else
result[τ] = aggr
end
end
end
end
result
end
result = calculation(0:10, [0.1, 0.2])
Would Actors.jl work for this use case? Or is that overkill? I find the documentation nice and the concept easy to understand. But I am wondering if this avoid data races and other pitfalls.
using Actors: spawn, call
using Base.Threads: @threads
function accumulate(dict::Dict, t::Int, aggregate::Aggregate)
if haskey(dict, t)
dict[t] += aggregate
else
dict[t] = aggregate
end
end
accumulate(dict::Dict) = copy(dict)
struct Accumulator{L}
link::L
end
accumulate(acc::Accumulator, args...) = call(acc.link, args...)
function accumulator(::Type{T}) where T <: Real
Accumulator(spawn(accumulate, Dict{Int,Aggregate{T}}()))
end
function calculation_actors(ts, κs)
acc = accumulator(Float64)
@threads for t in ts
@threads for κ in κs
buffer = make_cohort_buffer(t, κ)
while buffer.age ≤ MAX_AGE
step_cohort_buffer!(buffer)
aggr = aggregate_cohort_buffer(buffer)
accumulate(acc, buffer.t, aggr)
end
end
end
accumulate(acc)
end
result = calculation_actors(0:10, [0.1, 0.2])
One approach would be to calculate the results dict separately for each thread, then combine them at the end, e.g.
function get_single_buffer_results(t, κ)
buffer = make_cohort_buffer(t, κ)
result = Dict{Int,Aggregate{Float64}}()
while buffer.age ≤ MAX_AGE
step_cohort_buffer!(buffer)
aggr = aggregate_cohort_buffer(buffer)
τ = buffer.t
if haskey(result, τ)
result[τ] += aggr
else
result[τ] = aggr
end
end
return result
end
function calculation(ts, κs)
combined_result = Dict{Int,Aggregate{Float64}}()
for t in ts
results = fetch.([Threads.@spawn get_single_buffer_results(t, κ) for κ in κs])
for result in results
for (k, v) in result
if haskey(combined_result, k)
combined_result[k] += v
else
combined_result[k] = v
end
end
end
end
return combined_result
end
A more general approach is to use a Channel to communicate between threads. This is the approach used by Go and is generally pretty popular, though there’s a bit more boilerplate:
function calculation(ts, κs)
result = Dict{Int,Aggregate{Float64}}()
channel = Channel{Tuple{Int, Aggregate{Float64}}}(Inf)
consumer = Threads.@spawn for (τ, aggr) in channel
if haskey(result, τ)
result[τ] += aggr
else
result[τ] = aggr
end
end
for t in ts
# this I would like to parallelize
for κ in κs
buffer = make_cohort_buffer(t, κ)
while buffer.age ≤ MAX_AGE
step_cohort_buffer!(buffer)
aggr = aggregate_cohort_buffer(buffer)
τ = buffer.t
put!(channel, (τ, aggr))
end
end
end
while !isempty(channel)
sleep(.01)
end
result
end
Thanks for the detailed reply. I have a question on the part quoted above.
Could it happen that the channel is empty (isempty) because a task is in progress and it has not put a value there yet?
If yes, could it be that the above terminates early? Is there a pattern that waits on all the tasks (ie calculation proceeds when the last task terminates)?
Also, generally, is there a resource to learn about asynchronous computation (in the context of Julia)?
I haven’t found a general solution I really like – my solutions for waiting tend to be specific and problem-dependent. The pattern here is to wait for the producer(s) to finish, and then wait for the Channel to be empty. So with multiple producers, we could do this:
function process_single_buffer(channel, t, κ)
buffer = make_cohort_buffer(t, κ)
result = Dict{Int,Aggregate{Float64}}()
while buffer.age ≤ MAX_AGE
step_cohort_buffer!(buffer)
aggr = aggregate_cohort_buffer(buffer)
τ = buffer.t
put!(channel, (τ, aggr))
end
return result
end
function calculation(ts, κs)
result = Dict{Int,Aggregate{Float64}}()
channel = Channel{Tuple{Int, Aggregate{Float64}}}(Inf)
consumer = Threads.@spawn for (τ, aggr) in channel
if haskey(result, τ)
result[τ] += aggr
else
result[τ] = aggr
end
end
for t in ts
# this I would like to parallelize
wait.([Threads.@spawn process_single_buffer(channel, t, κ) for κ in κs])
end
while !isempty(channel)
sleep(.01)
end
result
end
If you know the number of results you expect (length(ts) * length(κs))?, you could also wait until results has the expected length.
A different approach is to use a Channel{Task}, where the channel schedules the tasks. Then since you have explicit references, you can just wait on the tasks themselves – this is what I do in some more complex cases. But creating a task for each entry has some extra overhead.
I had trouble finding a good resource on asynchronous programming in Julia (I found the information I needed was scattered across many pages in the documentation + discourse) so I wrote up what I’d learned a while ago: https://www.lesswrong.com/posts/kPnjPfp2ZMMYfErLJ/julia-tasks-101
I recommend going for what @Satvik recommended with get_single_buffer_results. That is, create a Task per get_single_buffer_results (that is an expensive enough job, right?), and then combine all results.
I’d beware of the following potential issues with that approach (you need to check whether that is a problem with your real code / parameters):
Do you need memory reuse for CohortBuffer across different \kappa, in settings where you have many mroe \kappa than threads / cores? Then you need to do something with channels for reusing the CohortBuffers.
Can you afford the memory of storing the separate results buffers for each \kappa before combining? You can maybe save some space by putting the separate results buffers into vectors and only use a Dict when combining.
Is the get_single_buffer_results expensive enough to warrant a Task? I.e. does it take e.g. 1 ms (expensive enough!) or rather 1 us (scary close to scheduler overhead, need to combine multiple calls into a single Task)
Generally, I’d recommend that especially for debuggability / repeatability.
It is not absolutely required, but it is pretty useful to have binary identical results, independent of scheduler decisions / race outcomes. If you aggregate concurrently, then you have the following additional sources of non-reproducibility:
Floating point addition is not associative. But the order / associativity of your additions in result can depend on race outcomes, even if you use channels / locks!
Dict binary layout depends on the order in which values have been added. Say, you add both key a and key b to a dict, and both try to go into the same slot. Which one gets the slot and which gets displaced depends on the outcome of races (who could grab the lock of the dict / channel first). This implies that the resultant iterator order over the dict depends on races, and as does naive display code.
Non reproducibility sucks, even if all possible outcomes are correct.
When debugging, it is super important to be able to add a print somewhere, and then compare between two versions to pinpoint where the divergence has been introduced.
Not sure if useful or already known, but the documentation written for the OhMyThreads.jl and ChunkSplitters.jl packages are very helpful. Maybe this particular section applies to your case:
(if it does not apply, just ignore - I admittedly didn´t follow the thread very carefully).