LIFO Channels

I have a parallel computation workflow that uses RemoteChannel->Channel to communicate results between master and worker nodes. Channels by default work like a queue (FIFO):

using Distributed

julia> shared_job_q = RemoteChannel(() -> Channel{Tuple}(1000))
julia> put!(shared_job_q, (1,0.1))
julia> put!(shared_job_q, (2,0.2))
julia> println(take!(shared_job_q))
(1, 0.1)
julia> println(take!(shared_job_q))
(2, 0.2)

However, my application would benefit from a stack (LIFO) behavior:

julia> putfirst!(shared_job_q, (1,0.1))
julia> putfirst!(shared_job_q, (2,0.2))
julia> println(take!(shared_job_q))
(2, 0.2)
julia> println(take!(shared_job_q))
(1, 0.1)

I can’t find a putfirst! method (or similar) in the official Channels API and making my own seems to require relying on internals (below is a quick prototype, based on Base/channels.jl and Distributed.jl), which I would really like to avoid. Would there be a better solution to this?

function check_channel_state(c::Channel)
    if !isopen(c)
        excp = c.excp
        excp !== nothing && throw(excp)
        throw(closed_exception())
    end
end

isbuffered(c::Channel) = c.sz_max==0 ? false : true

putfirst!(rr::RemoteChannel, args...) = (Distributed.call_on_owner(putfirst_ref, rr, args...); rr)
putfirst_ref(rid, args...) = (putfirst!(Distributed.lookup_ref(rid), args...); nothing)

function putfirst!(c::Channel{T}, v) where T
    check_channel_state(c)
    v = convert(T, v)
    return isbuffered(c) ? putfirst_buffered(c, v) : throw("putfirst! not implemented for unbuffered Channels")
end

function putfirst_buffered(c::Channel, v)
    lock(c)
    try
        while length(c.data) == c.sz_max
            check_channel_state(c)
            wait(c.cond_put)
        end
        pushfirst!(c.data, v)
        # notify all, since some of the waiters may be on a "fetch" call.
        notify(c.cond_take, nothing, true, false)
    finally
        unlock(c)
    end
    return v
end
1 Like

I’m facing the same issue. What was your solution in the end? Right now, I think this could be most elegantly achieved by making the underlying data structure of a Channel exchangeable. Ironically the current underlying data structure is a Vector – which for push!() and pop!() is LIFO – it’s just begin used FIFO because take!() calls popfirst!() instead of pop!().

No solution thus far, I still have a hacky implementation that relies on internals. Making the underlying data structure exchangable sounds like a reasonable plan to me, but perhaps people more knowledgeable than me could chime in on potential pitfalls?

For reference, here’s my workaround


# not ideal but Base doesn't leave us much choice
function lifo_take!(c::Channel)
    lock(c)
    try
        while isempty(c.data)
            check_channel_state(c)
            wait(c.cond_take)
        end
        v = pop!(c.data) # only line changed from Base.take!()
        notify(c.cond_put, nothing, false, false)
        return v
    finally
        unlock(c)
    end
end

struct LIFOChannel{T} <: AbstractChannel{T}
    channel::Channel{T}
end

LIFOChannel{T}(n::Int) where T = LIFOChannel(Channel{T}(n))

put!(lc::LIFOChannel, x) = put!(lc.channel, x)
take!(lc::LIFOChannel) = lifo_take!(lc.channel)

close(lc::LIFOChannel) = close(lc.channel)
isopen(lc::LIFOChannel) = isopen(lc.channel)

isready(lc::LIFOChannel) = isready(lc.channel)

lock(lc::LIFOChannel) = lock(lc.channel)
unlock(lc::LIFOChannel) = unlock(lc.channel)
2 Likes