I’m curious where you picked up the term “compute farm” from. In the parallel skeleton literature, (task) farm skeleton refers to something different (FYI, my Concurrency patterns for controlled parallelisms tutorial includes a simple implementation of task farm in Julia.) I think what you are referring to is close to the feedback skeleton (although I don’t know how standard this nomenclature is):
— Castro, David, Kevin Hammond, and Susmit Sarkar. 2016. “Farms, Pipes, Streams and Reforestation: Reasoning about Structured Parallel Processes Using Types and Hylomorphisms.” ACM SIGPLAN Notices 51 (9): 4–17. Farms, pipes, streams and reforestation: reasoning about structured parallel processes using types and hylomorphisms | ACM SIGPLAN Notices.
I agree with @jpsamaroo’s comment on the bug in using atomic operation on busy
. To avoid a bug like this, it’d be better to use a pre-existing abstraction. This is what C++ calls latch
and CountDownLatch
(although I’ve seen some dispute on the naming). Anyway, I think you can wrap it into a abstraction called latch or barrier, which makes testing and reviewing easier. FYI, my SyncBarriers.jl library has a barrier implementations that can be used for this. A fuzzy barrier is useful for this purpose since (presumably) the workers don’t have to wait for other workers.
Having said that, I also agree with:
One solution may be to use a simpler solution like below (untested), which doesn’t require the barrier dance at all:
function parallel_feedback(f, inits, T = eltype(inits); ntasks=Threads.nthreads())
worklist = Channel{T}() do worklist
for x in inits
put!(worklist, x)
end
end
@sync for _ in 1:ntasks
Threads.@spawn try
f(worklist)
finally
close(worklist)
end
end
end
It can be used as
parallel_feedback(rand(10)) do worklist
for x in worklist
for k in 0:20 # simulate Poisson distribution
t -= λ^k * exp(-λ) / factorial(k)
t < 0 && break
y = rand()
put!(worklist, y)
# Note: omitting actual output handling
end
end
end
This has an extra benefit that you don’t need to allocate the temporary next
vector (cutting GC interaction is important for scaling up threaded program ATM). It also makes the work available right away for other workers (i.e., the loop inside f
does not have to complete for “submitting” the work). Furthermore, it is now fairly easy to swap the worklist
implementation with a list of work-stealing deque, which is very appealing here since the single worker case degenerate into the usual (single-threaded) worklist-based recursion. This makes this construct robustly perform well when nested inside other parallel programs.
struct BoundedChannel{T}
ch::Channel{T}
end
function Base.put!(ch::BoundedChannel, x)
Base.n_avail(ch.ch)<size || error("BoundedChannel: channel is full")
put!(ch.ch, x)
end
Base.iterate(ch::BoundedChannel, _ = nothing) = iterate(ch.ch)
and use it as (Edit: f(BoundedChannel(worklist))
instead of f(worklist)
.
n_avail
is an internal function)
Some more random comments:
ChannelFarm{T}(f::Function, ...)
: every object in Julia is potentially callable. I don’t think bounding::Function
is a good idea. See Add mergewith[!](combine, dicts...) by tkf · Pull Request #34296 · JuliaLang/julia · GitHub for some discussion.ChannelFarm{T}(...; nthreads=nthreads())
: The argumentnthreads
does not specify the number of threads. It specifies the number of worker tasks. I’d call itntasks
(or maybenworkers
).