I have a parallel computation workflow that uses RemoteChannel->Channel to communicate results between master and worker nodes. Channels by default work like a queue (FIFO):
using Distributed
julia> shared_job_q = RemoteChannel(() -> Channel{Tuple}(1000))
julia> put!(shared_job_q, (1,0.1))
julia> put!(shared_job_q, (2,0.2))
julia> println(take!(shared_job_q))
(1, 0.1)
julia> println(take!(shared_job_q))
(2, 0.2)
However, my application would benefit from a stack (LIFO) behavior:
julia> putfirst!(shared_job_q, (1,0.1))
julia> putfirst!(shared_job_q, (2,0.2))
julia> println(take!(shared_job_q))
(2, 0.2)
julia> println(take!(shared_job_q))
(1, 0.1)
I can’t find a putfirst!
method (or similar) in the official Channels API and making my own seems to require relying on internals (below is a quick prototype, based on Base/channels.jl and Distributed.jl), which I would really like to avoid. Would there be a better solution to this?
function check_channel_state(c::Channel)
if !isopen(c)
excp = c.excp
excp !== nothing && throw(excp)
throw(closed_exception())
end
end
isbuffered(c::Channel) = c.sz_max==0 ? false : true
putfirst!(rr::RemoteChannel, args...) = (Distributed.call_on_owner(putfirst_ref, rr, args...); rr)
putfirst_ref(rid, args...) = (putfirst!(Distributed.lookup_ref(rid), args...); nothing)
function putfirst!(c::Channel{T}, v) where T
check_channel_state(c)
v = convert(T, v)
return isbuffered(c) ? putfirst_buffered(c, v) : throw("putfirst! not implemented for unbuffered Channels")
end
function putfirst_buffered(c::Channel, v)
lock(c)
try
while length(c.data) == c.sz_max
check_channel_state(c)
wait(c.cond_put)
end
pushfirst!(c.data, v)
# notify all, since some of the waiters may be on a "fetch" call.
notify(c.cond_take, nothing, true, false)
finally
unlock(c)
end
return v
end