Multithreading, preallocation of objects, threadid() and task migration

So, a Base-only pattern to do this may be

ntasks = Thraeds.nthraeds()  # use a larger values if `do_stuff!` contains I/O
@sync begin
    workqueue = Channel{Tuple{Int,Float64}}(Inf)

    # Do this after `@spawn`s if computing work itself is CPU-intensive
    for (i, x) in enumerate(xs)
        put!(workqueue, (i, x))
    end
    close(workqueue)  # signal the end of work

    for _ in 1:ntasks
        @spawn begin
            local matrix = zeros(2, 2)  # allocate the buffer, and don't share
            for (i, x) in workqueue
                out[i] = do_stuff!(matrix, x)
            end
        end
    end
end
  • If do_stuff! contains I/O like writing to a file or fetching something over internet, it makes sense to create ntasks larger than the number of CPUs. For example, Python’s concurrent.futures.ThreadPoolExecutor creates four more threads than the number of CPUs (and bound it by 32) assuming that the work is I/O-dominated.
  • If computing an element for workqueue also takes sometime, it makes sense to move for (i, x) in enumerate(xs) after for _ in 1:ntasks. However, it then increases contention on workqueue.
  • If each do_stuff!(matrix, x) is fast but xs is long, it’s better to chunk the works and use Channel{Vector{Tuple{Int,Float64}}}.
4 Likes