Writing to a shared buffer in multi-threaded for loop

I have a for loop that iterates over millions of elements, each of which are processed quickly. Elements update random parts of a shared buffer. I’m trying to parallelize this operation in a performant way. I have considered the following design approaches, but they each have problems:

  1. Storing multiple copies of the shared buffer in a Channel, then reducing the buffers at the end. Unfortunately the time cost of getting and putting the buffer dominates the computation time, making this approach very slow.
  2. Using a buffer per thread, and reducing at the end. Unfortunately there is no reliable way of getting a thread index.
  3. Creating a TaskLocalValue. Notwithstanding the fact that the buffer is large, and the number of tasks could be large too (for load-balancing), using too much memory, I don’t see a way to reduce TaskLocalValues after the for loop, since they get cleaned up.

Can anyone recommend a better solution?

TaskLocalValues.jl are just a thin wrapper around Base.task_local_storage, which provides more flexibility. Using the latter, when you create the local buffer for the first time in a given task, you could store it in a thread-global array (protected by a lock) to later reduce over, i.e. something like:

buf = get!(task_local_storage(), :MY_BUFFER_SYMBOL) do
    # create new buffer if it doesn't exist yet for this task
    lock(my_buffers_lock) do
        newbuf = create_new_buffer()
        push!(my_buffers, newbuf)
        newbuf
    end
end::SomeBufferType

Alternatively, you can replace your threaded loop with a loop over equal-sized chunks of the array, via ChunkSplitters.jl, and allocate one buffer per chunk, as described here: No more threadid indexing? [thread-local storage] - #13 by lmiq — the tradeoff is that this pushes more load-balancing responsibility onto you.

I think the description is too vague. Is it really random? Or not.

If let me have a wild guess, it sounds like constructing a LP model with the GRBaddvar api.

say, you have 1million variables, the buffer is the constraint matrix.

it probably doesn’t make sense to use multithreading in the context here.

It’s a for loop to compute the Hessian of an optimization problem. Each element is a cost function that updates various sub-blocks of the Hessian. The sub-blocks are known, so not really random, but each cost function can update multiple sub-blocks, so it isn’t possible to ensure different threads access mutually exclusive parts of the buffer - unless I substantially change the program design (which has got me thinking :sweat_smile:).

Interesting guess :grinning_face_with_smiling_eyes:

In my context, multi-threading definitely makes sense.

Could it be that this zero-argument method of task_local_storage is not documented? I only get docstrings for 1, 2 or 3 arguments.

Since Hessian is used, it’s a nonlinear optimization. Is it nonconvex?

That doesn’t seem reasonable. Each cost function should only associate to a particular block.

Block decomposition for a large-scale optimization with separable structure is well studied. Personally I’m pretty familiar with this topic and I’ve been writing algorithms in julia with multithreading APIs. The situation you’re currently describing make me think that your method of decomposition is not very ideal.

From your problem description it seems that you could avoid contention by spawning one task to manage the buffer, and attach it to a Channel. Other computational tasks would then send terms and location information over the channel, preferably in groups to reduce overhead.

Thanks for trying to help, @WalterMadelim . I’ll write to you separately about this.

I’d prefer to keep this thread limited to the problem I described, since that will be of most help to others seeking solutions to that specific problem. I’m grateful for those answers that accept the premise of the question.

I agree that’s another possible approach. However, it requires a significant change to the single threaded version, and is a very bespoke solution. I’d rather have a more generic solution that’s also simpler to implement, given the single threaded implementation.

Well, actually you can, e.g. transfer this thread to the optimization category, so others can also share opinions. (Who knows if this is an XY problem, though.) I think the real helpful mindset is to use the right design and right tools to tackle the specific problems encountered in the real world.

This approach tends to sacrifice reproducibility: The order of task-local buffers stored in the global lock-protected array depends on scheduler/races, and floating point ops are not associative. That makes debugging harder (you cannot checkpoint the state after applying the updates).

I would recommend this ^

OhMyThreads.jl may be a more user-friendly way to achieve the same result. See: Thread-Safe Storage · OhMyThreads.jl

It’s easy to do it manually:

buffers = [makebuf() for _ in 1:nthreads()]
@sync for thrid in 1:nthreads()
    @spawn begin
        buf = buffers[thrid]
        < do stuff to buf >
    end
end
< reduce over buffers >

Am I right in thinking that either I need to have a way to get a threadsafe buffer every iteration, or else use an explicit chunk split across threads? The latter means no automatic load balancing, while the former means some overhead. Unless I’m mistaken, it’s impossible to avoid this trade-off.

I need to check the overhead of get!(task_local_storage(), :MY_BUFFER_SYMBOL) to see how much improvement this makes over Channel. The overhead may be sufficiently small.

One way to do it is a dynamic chunk split, i.e. take long chunks in the beginning and cutting it down. I’ve sometimes done things like this:

using Base.Threads

buffers = [makebuf() for _ in 1:nthreads()]
index = Atomic{Int}(1)
@sync for thrid in 1:nthreads()
    @spawn begin
        local buf = buffers[thrid]
        local chunksize = < some starting value, like largestindex/2nthreads() or similar >
        while (firstidx = atomic_add!(index, chunksize)) <= largestindex
            for idx in firstidx:min(firstidx+chunksize-1, largestindex)
                < process index idx, store somewhere in buf >
            end
            chunksize = <compute new chunk size,e.g. as max(10, (largestindex - index[]) / 2nthreads())>
        end
    end
end
< reduce buffers >

In this way there’s one buffer for each thread. You process a chunk of indices at a time to avoid overhead, but the chunk size can be made smaller as you approach the last index to load balance.

I’ve been doing it like this. Let M be the number of loop iterations:

nt = min(M, Threads.nthreads())
batches = [floor(Int,M/nt*(t-1))+1:floor(Int,M/nt*t) for t ∈ 1:nt]  # indexes to split simulations by CPU thread
...
Threads.@threads for i ∈ 1:nt
    @inbounds for j ∈ batches[i]
       ...
    end
end

The j index can then be used to read and write various arrays.

But one thing worth thinking about here is “false sharing”, when the CPU throws away data in a memory buffer because adjacent memory locations were modified. Though I understand the latest version of Julia has a feature to help with this, which I haven’t wrapped my head around yet.

If the per-j information that is being stored by the computation is just going to be reduced with addition, then you can increase speed by doing each batch’s reduction within the j loop, accumulating on the fly. Then you only have to save nt results, not M results.

Seems essentially equivalent to the ChunkSplitters.jl approach linked above? That is, you decide in advance how to divide your M tasks among the threads. Seems easier to use ChunkSplitters, though?