So, a Base-only pattern to do this may be
ntasks = Thraeds.nthraeds() # use a larger values if `do_stuff!` contains I/O
@sync begin
workqueue = Channel{Tuple{Int,Float64}}(Inf)
# Do this after `@spawn`s if computing work itself is CPU-intensive
for (i, x) in enumerate(xs)
put!(workqueue, (i, x))
end
close(workqueue) # signal the end of work
for _ in 1:ntasks
@spawn begin
local matrix = zeros(2, 2) # allocate the buffer, and don't share
for (i, x) in workqueue
out[i] = do_stuff!(matrix, x)
end
end
end
end
- If
do_stuff!contains I/O like writing to a file or fetching something over internet, it makes sense to createntaskslarger than the number of CPUs. For example, Python’sconcurrent.futures.ThreadPoolExecutorcreates four more threads than the number of CPUs (and bound it by 32) assuming that the work is I/O-dominated. - If computing an element for
workqueuealso takes sometime, it makes sense to movefor (i, x) in enumerate(xs)afterfor _ in 1:ntasks. However, it then increases contention onworkqueue. - If each
do_stuff!(matrix, x)is fast butxsis long, it’s better to chunk the works and useChannel{Vector{Tuple{Int,Float64}}}.