So, a Base
-only pattern to do this may be
ntasks = Thraeds.nthraeds() # use a larger values if `do_stuff!` contains I/O
@sync begin
workqueue = Channel{Tuple{Int,Float64}}(Inf)
# Do this after `@spawn`s if computing work itself is CPU-intensive
for (i, x) in enumerate(xs)
put!(workqueue, (i, x))
end
close(workqueue) # signal the end of work
for _ in 1:ntasks
@spawn begin
local matrix = zeros(2, 2) # allocate the buffer, and don't share
for (i, x) in workqueue
out[i] = do_stuff!(matrix, x)
end
end
end
end
- If
do_stuff!
contains I/O like writing to a file or fetching something over internet, it makes sense to createntasks
larger than the number of CPUs. For example, Python’sconcurrent.futures.ThreadPoolExecutor
creates four more threads than the number of CPUs (and bound it by 32) assuming that the work is I/O-dominated. - If computing an element for
workqueue
also takes sometime, it makes sense to movefor (i, x) in enumerate(xs)
afterfor _ in 1:ntasks
. However, it then increases contention onworkqueue
. - If each
do_stuff!(matrix, x)
is fast butxs
is long, it’s better to chunk the works and useChannel{Vector{Tuple{Int,Float64}}}
.