Multi-threaded code with :dynamic scheduler and mutable data

I have a loop which I am currently multi-threading using the :static scheduler:

threadcache = [create_cache() for i in 1:Threads.nthreads()]
Threads.@threads :static for i in 1:...
    data = threadcache[Threads.threadid()]
    result = compute_stuff(..., data) #data will be mutated in this function
    store_results[i] = result #Maybe save the result in a vector

where create_cache() creates a struct with some pre-allocated arrays which will be mutated in compute_stuff().

The function compute_stuff does not always take the same amount of time to run, so I think I could benefit a bit from using the :dynamic scheduler (or perhaps using @sync ...@spawn). The docs also seems to discourage the use of :static scheduling.

The problem is that I don’t understand how to multi-thread this with dynamic scheduler when I have the mutable data threadcache[Threads.threadid()], since dynamic scheduling do not guarantee that Threads.threadid() is constant during an iteration.

Can anyone help me multi-thread this with either :dynamic scheduler or @spawn ... @sync.

Something like this?

threadcache = [create_cache() for i in 1:Threads.nthreads()]
idx = Threads.Atomic{Int}(N)
@sync for thr in eachindex(threadcache)
    Threads.@spawn begin
        data = threadcache[$thr]
        while (i = Threads.atomic_sub!(idx, 1)) > 0
            result = mutate(i, data)
            store_result[i] = result

Thanks! But it feels like that this is more complicated than what should be needed? :sweat_smile:

Refining my google search a bit, I found this post which uses Channels instead. I have never used channels before, but it looks a bit simpler. I will try it out

It feels like my problem/questions would be a pretty common thing people would want to do, so I am a bit surprised that it is difficult to find information about it.

regarding the use of Channels, let me link to this Post, which has a nice code example that pretty much directly applies to what you are trying to do here

See: Home · ChunkSplitters.jl

And here there is a discussion exactly about load balancing in this context: Parallel load balancing · JuliaNotes.jl

But basically, you can do this:

using ChunkSplitters
function run(;nchunks=Threads.nthreads())
    threadcache = [create_cache() for i in 1:nchunks]
    @sync for (i_range, i_chunk) in chunks(data, nchunks)
        @spawn for i in i_range
            data = threadcache[ichunk]
            result = compute_stuff(..., data) #data will be mutated in this function
            store_results[i] = result #Maybe save the result in a vector
    return store_results

and you can increase nchunks to be of any size, for instance 10 times nthreads(), to take advantage of the dynamic scheduling while at the same time being thread-safe by not using threadid()

This is great, thank you. I will try ChunkSplitters.jl and see how it works in my code!