Multithreaded compute farm

I’m curious where you picked up the term “compute farm” from. In the parallel skeleton literature, (task) farm skeleton refers to something different (FYI, my Concurrency patterns for controlled parallelisms tutorial includes a simple implementation of task farm in Julia.) I think what you are referring to is close to the feedback skeleton (although I don’t know how standard this nomenclature is):

— Castro, David, Kevin Hammond, and Susmit Sarkar. 2016. “Farms, Pipes, Streams and Reforestation: Reasoning about Structured Parallel Processes Using Types and Hylomorphisms.” ACM SIGPLAN Notices 51 (9): 4–17. Farms, pipes, streams and reforestation: reasoning about structured parallel processes using types and hylomorphisms | ACM SIGPLAN Notices.

I agree with @jpsamaroo’s comment on the bug in using atomic operation on busy. To avoid a bug like this, it’d be better to use a pre-existing abstraction. This is what C++ calls latch and CountDownLatch (although I’ve seen some dispute on the naming). Anyway, I think you can wrap it into a abstraction called latch or barrier, which makes testing and reviewing easier. FYI, my SyncBarriers.jl library has a barrier implementations that can be used for this. A fuzzy barrier is useful for this purpose since (presumably) the workers don’t have to wait for other workers.

Having said that, I also agree with:

One solution may be to use a simpler solution like below (untested), which doesn’t require the barrier dance at all:

function parallel_feedback(f, inits, T = eltype(inits); ntasks=Threads.nthreads())
    worklist = Channel{T}() do worklist
        for x in inits
            put!(worklist, x)
        end
    end
    @sync for _ in 1:ntasks
        Threads.@spawn try
            f(worklist)
        finally
            close(worklist)
        end
    end
end

It can be used as

parallel_feedback(rand(10)) do worklist
    for x in worklist
        for k in 0:20 # simulate Poisson distribution
            t -= λ^k * exp(-λ) / factorial(k)
            t < 0 && break
            y = rand()
            put!(worklist, y)
            # Note: omitting actual output handling
        end
    end
end

This has an extra benefit that you don’t need to allocate the temporary next vector (cutting GC interaction is important for scaling up threaded program ATM). It also makes the work available right away for other workers (i.e., the loop inside f does not have to complete for “submitting” the work). Furthermore, it is now fairly easy to swap the worklist implementation with a list of work-stealing deque, which is very appealing here since the single worker case degenerate into the usual (single-threaded) worklist-based recursion. This makes this construct robustly perform well when nested inside other parallel programs.

Also, if you want to bound the channel size, you can create a simple wrapper type like
struct BoundedChannel{T}
    ch::Channel{T}
end

function Base.put!(ch::BoundedChannel, x)
    Base.n_avail(ch.ch)<size || error("BoundedChannel: channel is full")
    put!(ch.ch, x)
end

Base.iterate(ch::BoundedChannel, _ = nothing) = iterate(ch.ch)

and use it as f(BoundedChannel(worklist)) instead of f(worklist).
(Edit: n_avail is an internal function)

Some more random comments:

4 Likes