I have a loop which I am currently multi-threading using the :static scheduler:
threadcache = [create_cache() for i in 1:Threads.nthreads()]
Threads.@threads :static for i in 1:...
data = threadcache[Threads.threadid()]
result = compute_stuff(..., data) #data will be mutated in this function
store_results[i] = result #Maybe save the result in a vector
end
where create_cache() creates a struct with some pre-allocated arrays which will be mutated in compute_stuff().
The function compute_stuff does not always take the same amount of time to run, so I think I could benefit a bit from using the :dynamic scheduler (or perhaps using @sync ...@spawn). The docs also seems to discourage the use of :static scheduling.
The problem is that I don’t understand how to multi-thread this with dynamic scheduler when I have the mutable data threadcache[Threads.threadid()], since dynamic scheduling do not guarantee that Threads.threadid() is constant during an iteration.
Can anyone help me multi-thread this with either :dynamic scheduler or @spawn ... @sync.
threadcache = [create_cache() for i in 1:Threads.nthreads()]
idx = Threads.Atomic{Int}(N)
@sync for thr in eachindex(threadcache)
Threads.@spawn begin
data = threadcache[$thr]
while (i = Threads.atomic_sub!(idx, 1)) > 0
result = mutate(i, data)
store_result[i] = result
end
end
end
Thanks! But it feels like that this is more complicated than what should be needed?
Refining my google search a bit, I found this post which uses Channels instead. I have never used channels before, but it looks a bit simpler. I will try it out
It feels like my problem/questions would be a pretty common thing people would want to do, so I am a bit surprised that it is difficult to find information about it.
regarding the use of Channels, let me link to this Post, which has a nice code example that pretty much directly applies to what you are trying to do here
using ChunkSplitters
function run(;nchunks=Threads.nthreads())
threadcache = [create_cache() for i in 1:nchunks]
@sync for (i_range, i_chunk) in chunks(data, nchunks)
@spawn for i in i_range
data = threadcache[ichunk]
result = compute_stuff(..., data) #data will be mutated in this function
store_results[i] = result #Maybe save the result in a vector
end
end
return store_results
end
and you can increase nchunks to be of any size, for instance 10 times nthreads(), to take advantage of the dynamic scheduling while at the same time being thread-safe by not using threadid()