I searched for a function in Julia that creates a “compute farm” based on a channel: workers take tasks from a channel and put back 0 or more tasks to the same channel. (many mathematical problems seem to be approachable in this way; e.g. exploring a graph which is accessible neighbour-by-neighbour).
I couldn’t find one, so wrote it myself. It seems to perform reasonably well, but since I’m far from expert on multithreading I’d be very happy if someone could review my code and give useful criticism.
The function itself is fairly short, the idea is to create an object ChannelFarm{T}
similar to Channel{T}
and use it as
ch = ChannelFarm{T}() do x
# do something with x::T
return a tuple or vector of 0 or more objects of type T
end
put!(ch,initial object(s))
and will do something with all the objects of type T, either put initially or created within the function.
Here goes my code:
module Farm
using Base.Threads, Test
import Base: put!, take!, wait, close
struct ChannelFarm{T} <: AbstractChannel{T}
busy::Atomic{UInt}
done::Threads.Condition
ch::Channel{T}
function ChannelFarm{T}(f::Function, size=16384; nthreads=nthreads()) where T
busy = Atomic{UInt}(nthreads)
done = Threads.Condition()
ch = Channel{T}(size)
for _=1:nthreads
bind(ch, Threads.@spawn while true
atomic_sub!(busy,UInt(1))
if busy[] == 0 # last one turns the lights off
lock(done)
notify(done)
unlock(done)
end
x = take!(ch)
atomic_add!(busy,UInt(1))
for y=f(x)
Base.n_avail(ch)<size || throw("ChannelFarm: channel is full")
put!(ch,y)
end
end)
end
new(busy,done,ch)
end
end
put!(chf::ChannelFarm{T},x::T) where T = put!(chf.ch,x)
function wait(chf::ChannelFarm)
lock(chf.done)
wait(chf.done)
unlock(chf.done)
end
close(chf::ChannelFarm) = close(chf.ch)
@testset "ChannelFarm" begin
λ = 0.9
inputs = [Float64[] for _=0:nthreads()]
outputs = [Float64[] for _=1:nthreads()]
ch = ChannelFarm{Float64}() do x
sleep(x/10)
push!(outputs[threadid()],x) # record it
t = rand()
next = Float64[]
for k=0:20 # simulate Poisson distribution
t -= λ^k * exp(-λ) / factorial(k)
t<0 && break
y = rand()
push!(inputs[threadid()],y)
push!(next,y)
end
next
end
for t=1:10
y = rand()
sleep(y) # give some time to the queues to start
push!(inputs[nthreads()+1],y)
put!(ch, y)
end
wait(ch)
close(ch)
vinputs = Set(vcat(inputs...))
voutputs = Set(vcat(inputs...))
@test vinputs == voutputs
end
end