Combining results from threaded calculations

I have a calculation which

  1. uses a buffer for a subset of a cohort of agents (in the MWE below, each has different \kappa),
  2. steps this forward from a given value in time,
  3. calculates some aggregate at each point in time.

Below is a much-simplified but self-contained MWE. The key functions are step_cohort_buffer! and aggregate_cohort_buffer, which stand in for much more complex calculations (for economists: this is an OG model).

For the actual problem I have 24 subgroups for each cohort, in the MWE I have two (κs). I would like to step these in parallel.

I am at loss how to do this nicely. Eg would simply using a ConcurrentDict be sufficient? What part should I wrap with locks? Or should I use atomic constructs? (You can tell that I am clueless about atomics and threads and all that stuff).

## take this part as given, implementing primitives
using LogExpFunctions

const MAX_AGE = 10

mutable struct CohortBuffer{T}
    age::Int
    t::Int
    values::Vector{T}
    κ::T
end

function make_cohort_buffer(t, κ; N = 10)
    CohortBuffer(1, t, rand(N), κ)
end

function step_cohort_buffer!(buffer::CohortBuffer)
    (; values, age, κ) = buffer
    @assert age ≤ MAX_AGE
    for i in eachindex(values)
        # stand-in for a much more complex calculation
        buffer.values[i] = logaddexp(buffer.values[i], κ / age)
    end
    buffer.age += 1
    buffer.t += 1
    buffer
end

struct Aggregate{T}
    x::T
    c::Int
end

aggregate_cohort_buffer(buffer) = Aggregate(sum(buffer.values), 1)

Base.:+(a::Aggregate, b::Aggregate) = Aggregate(a.x + b.x, a.c + b.c)


# I need help parallelizing this part safely
function calculation(ts, κs)
    result = Dict{Int,Aggregate{Float64}}()
    for t in ts
        # this I would like to parallelize
        for κ in κs
            buffer = make_cohort_buffer(t, κ)
            while buffer.age ≤ MAX_AGE
                step_cohort_buffer!(buffer)
                aggr = aggregate_cohort_buffer(buffer)
                τ = buffer.t
                if haskey(result, τ)
                    result[τ] += aggr
                else
                    result[τ] = aggr
                end
            end
        end
    end
    result
end

result = calculation(0:10, [0.1, 0.2])

A clarification: for the actual calculation, step_cohort_buffer! contains the most costly part. The cost of aggregate_cohort_buffer is trivial.

Would Actors.jl work for this use case? Or is that overkill? I find the documentation nice and the concept easy to understand. But I am wondering if this avoid data races and other pitfalls.

using Actors: spawn, call
using Base.Threads: @threads

function accumulate(dict::Dict, t::Int, aggregate::Aggregate)
    if haskey(dict, t)
        dict[t] += aggregate
    else
        dict[t] = aggregate
    end
end

accumulate(dict::Dict) = copy(dict)

struct Accumulator{L}
    link::L
end

accumulate(acc::Accumulator, args...) = call(acc.link, args...)

function accumulator(::Type{T}) where T <: Real
    Accumulator(spawn(accumulate, Dict{Int,Aggregate{T}}()))
end

function calculation_actors(ts, κs)
    acc = accumulator(Float64)
    @threads for t in ts
        @threads for κ in κs
            buffer = make_cohort_buffer(t, κ)
            while buffer.age ≤ MAX_AGE
                step_cohort_buffer!(buffer)
                aggr = aggregate_cohort_buffer(buffer)
                accumulate(acc, buffer.t, aggr)
            end
        end
    end
    accumulate(acc)
end

result = calculation_actors(0:10, [0.1, 0.2])

One approach would be to calculate the results dict separately for each thread, then combine them at the end, e.g.

function get_single_buffer_results(t, κ)
    buffer = make_cohort_buffer(t, κ)
    result = Dict{Int,Aggregate{Float64}}()
    while buffer.age ≤ MAX_AGE
        step_cohort_buffer!(buffer)
        aggr = aggregate_cohort_buffer(buffer)
        τ = buffer.t
        if haskey(result, τ)
            result[τ] += aggr
        else
            result[τ] = aggr
        end
    end
    return result
end

function calculation(ts, κs)
    combined_result = Dict{Int,Aggregate{Float64}}()
    for t in ts
        results = fetch.([Threads.@spawn get_single_buffer_results(t, κ) for κ in κs])
        for result in results
            for (k, v) in result
                if haskey(combined_result, k)
                    combined_result[k] += v
                else
                    combined_result[k] = v
                end
            end
        end
    end
    return combined_result
end

A more general approach is to use a Channel to communicate between threads. This is the approach used by Go and is generally pretty popular, though there’s a bit more boilerplate:

function calculation(ts, κs)
    result = Dict{Int,Aggregate{Float64}}()
    channel = Channel{Tuple{Int, Aggregate{Float64}}}(Inf)

    consumer = Threads.@spawn for (τ, aggr) in channel
        if haskey(result, τ)
            result[τ] += aggr
        else
            result[τ] = aggr
        end
    end

    for t in ts
        # this I would like to parallelize
        for κ in κs
            buffer = make_cohort_buffer(t, κ)
            while buffer.age ≤ MAX_AGE
                step_cohort_buffer!(buffer)
                aggr = aggregate_cohort_buffer(buffer)
                τ = buffer.t
                put!(channel, (τ, aggr))
            end
        end
    end
    while !isempty(channel)
        sleep(.01)
    end
    result
end
4 Likes

Thanks for the detailed reply. I have a question on the part quoted above.

Could it happen that the channel is empty (isempty) because a task is in progress and it has not put a value there yet?

If yes, could it be that the above terminates early? Is there a pattern that waits on all the tasks (ie calculation proceeds when the last task terminates)?

Also, generally, is there a resource to learn about asynchronous computation (in the context of Julia)?

I haven’t found a general solution I really like – my solutions for waiting tend to be specific and problem-dependent. The pattern here is to wait for the producer(s) to finish, and then wait for the Channel to be empty. So with multiple producers, we could do this:

function process_single_buffer(channel, t, κ)
    buffer = make_cohort_buffer(t, κ)
    result = Dict{Int,Aggregate{Float64}}()
    while buffer.age ≤ MAX_AGE
        step_cohort_buffer!(buffer)
        aggr = aggregate_cohort_buffer(buffer)
        τ = buffer.t
        put!(channel, (τ, aggr))
    end
    return result
end

function calculation(ts, κs)
    result = Dict{Int,Aggregate{Float64}}()
    channel = Channel{Tuple{Int, Aggregate{Float64}}}(Inf)
    consumer = Threads.@spawn for (τ, aggr) in channel
        if haskey(result, τ)
            result[τ] += aggr
        else
            result[τ] = aggr
        end
    end

    for t in ts
        # this I would like to parallelize
        wait.([Threads.@spawn process_single_buffer(channel, t, κ) for κ in κs])
    end

    while !isempty(channel)
        sleep(.01)
    end

    result
end

If you know the number of results you expect (length(ts) * length(κs))?, you could also wait until results has the expected length.

A different approach is to use a Channel{Task}, where the channel schedules the tasks. Then since you have explicit references, you can just wait on the tasks themselves – this is what I do in some more complex cases. But creating a task for each entry has some extra overhead.

I had trouble finding a good resource on asynchronous programming in Julia (I found the information I needed was scattered across many pages in the documentation + discourse) so I wrote up what I’d learned a while ago: https://www.lesswrong.com/posts/kPnjPfp2ZMMYfErLJ/julia-tasks-101

2 Likes

I recommend going for what @Satvik recommended with get_single_buffer_results. That is, create a Task per get_single_buffer_results (that is an expensive enough job, right?), and then combine all results.

I’d beware of the following potential issues with that approach (you need to check whether that is a problem with your real code / parameters):

  1. Do you need memory reuse for CohortBuffer across different \kappa, in settings where you have many mroe \kappa than threads / cores? Then you need to do something with channels for reusing the CohortBuffers.
  2. Can you afford the memory of storing the separate results buffers for each \kappa before combining? You can maybe save some space by putting the separate results buffers into vectors and only use a Dict when combining.
  3. Is the get_single_buffer_results expensive enough to warrant a Task? I.e. does it take e.g. 1 ms (expensive enough!) or rather 1 us (scary close to scheduler overhead, need to combine multiple calls into a single Task)

Generally, I’d recommend that especially for debuggability / repeatability.

It is not absolutely required, but it is pretty useful to have binary identical results, independent of scheduler decisions / race outcomes. If you aggregate concurrently, then you have the following additional sources of non-reproducibility:

  1. Floating point addition is not associative. But the order / associativity of your additions in result can depend on race outcomes, even if you use channels / locks!
  2. Dict binary layout depends on the order in which values have been added. Say, you add both key a and key b to a dict, and both try to go into the same slot. Which one gets the slot and which gets displaced depends on the outcome of races (who could grab the lock of the dict / channel first). This implies that the resultant iterator order over the dict depends on races, and as does naive display code.

Non reproducibility sucks, even if all possible outcomes are correct.

When debugging, it is super important to be able to add a print somewhere, and then compare between two versions to pinpoint where the divergence has been introduced.

1 Like

Not sure if useful or already known, but the documentation written for the OhMyThreads.jl and ChunkSplitters.jl packages are very helpful. Maybe this particular section applies to your case:

(if it does not apply, just ignore - I admittedly didn´t follow the thread very carefully).

3 Likes