Depending on the run-time of the computation, I think it’s cleaner to distribute the item in data
using a channel:
provider = Channel() do provider
for x in data
put!(provider, x)
end
end
ntasks = Threads.nthreads() # or some other positive integer
@sync for _ in 1:ntasks
buffer = zeros(UInt8, 10^6)
@spawn for x in provider
f!(x, buffer)
end
end
This makes the locality of buffer
very apparent.
If provider
channel becomes a bottleneck, you can send a chunk rather than an item:
provider = Channel() do provider
basesize = some number
for xs in Iterator.partition(data, basesize)
put!(provider, xs)
end
end
with for x in Iterators.flatten(provider)
in the child task.
Alternatively, you can avoid the bottleneck comes with a channel by using my package FLoops.jl and use an accumulator as a local storage… kind of:
basesize = some positive number
@floop ThreadedEx(basesize = basesize) for x in data
y = Some(x)
@reduce() do (buffer = zeros(UInt8, 10^6); y)
if y isa Some
f!(something(y), buffer)
end
end
end
It would call zeros(UInt8, 10^6)
only up to cld(length(data), basesize)
times. But this is super ugly/tricky and I should provide a better syntax…
Let me to nitpick that this would delay @spawn
of new tasks until available_ids
is “refilled”. It doesn’t matter if the task itself takes a long time compared to the over head of take!
and @spawn
, though. Also, I’d put buffers directly in the channel for simplicity, if I were to go with the strategy in the OP.