Multi-threaded code with :dynamic scheduler and mutable data

I have a loop which I am currently multi-threading using the :static scheduler:

threadcache = [create_cache() for i in 1:Threads.nthreads()]
Threads.@threads :static for i in 1:...
    data = threadcache[Threads.threadid()]
    result = compute_stuff(..., data) #data will be mutated in this function
    store_results[i] = result #Maybe save the result in a vector
end

where create_cache() creates a struct with some pre-allocated arrays which will be mutated in compute_stuff().

The function compute_stuff does not always take the same amount of time to run, so I think I could benefit a bit from using the :dynamic scheduler (or perhaps using @sync ...@spawn). The docs also seems to discourage the use of :static scheduling.

The problem is that I don’t understand how to multi-thread this with dynamic scheduler when I have the mutable data threadcache[Threads.threadid()], since dynamic scheduling do not guarantee that Threads.threadid() is constant during an iteration.

Can anyone help me multi-thread this with either :dynamic scheduler or @spawn ... @sync.

Something like this?

threadcache = [create_cache() for i in 1:Threads.nthreads()]
idx = Threads.Atomic{Int}(N)
@sync for thr in eachindex(threadcache)
    Threads.@spawn begin
        data = threadcache[$thr]
        while (i = Threads.atomic_sub!(idx, 1)) > 0
            result = mutate(i, data)
            store_result[i] = result
        end
    end
end

Thanks! But it feels like that this is more complicated than what should be needed? :sweat_smile:

Refining my google search a bit, I found this post which uses Channels instead. I have never used channels before, but it looks a bit simpler. I will try it out

It feels like my problem/questions would be a pretty common thing people would want to do, so I am a bit surprised that it is difficult to find information about it.

regarding the use of Channels, let me link to this Post, which has a nice code example that pretty much directly applies to what you are trying to do here

1 Like

See: Home · ChunkSplitters.jl

And here there is a discussion exactly about load balancing in this context: Parallel load balancing · JuliaNotes.jl

But basically, you can do this:

using ChunkSplitters
function run(;nchunks=Threads.nthreads())
    threadcache = [create_cache() for i in 1:nchunks]
    @sync for (i_range, i_chunk) in chunks(data, nchunks)
        @spawn for i in i_range
            data = threadcache[ichunk]
            result = compute_stuff(..., data) #data will be mutated in this function
            store_results[i] = result #Maybe save the result in a vector
        end
    end
    return store_results
end

and you can increase nchunks to be of any size, for instance 10 times nthreads(), to take advantage of the dynamic scheduling while at the same time being thread-safe by not using threadid()

1 Like

This is great, thank you. I will try ChunkSplitters.jl and see how it works in my code!