[ANN] PriorityChannels.jl

I’ve created a small package PriorityChannels.jl that implements a PriorityChannel that mimics Base.Channel , but where each element is associated with a priority. take! always returns the highest priority element. Internally, a heap is used to keep track of priorities. Usage example in the readme.

Difference between Channel and PriorityChannel

  • put!(pc, element, priority::Real) lower number indicates a higher priority (default = 0).
  • PriorityChannel can not be unbuffered (of length 0) and must have a positive length.
  • take!(pc) returns the highest priority item, PriorityChannel thus acts like a priority queue instead of a FIFO queue like Channel does
  • Pretty much all other functionality should be the same, including all constructors.

Installation

using Pkg
pkg"add PriorityChannels"

Happy New Year, may it be full of wisely prioritized and rewarding tasks!

17 Likes

Thank you Fredrik for this package! I’ve been looking for this functionality for some time (RemoteChannel with priority) but did not find anything. Do you think this priority functionality can easily be extended to work for Distributed.RemoteChannel as well? That would be very handy.

Hello Jesper! This package used some Julia internals that have since changed and this package requires some updates to work even with the standard Channel type. I haven’t looked deeply into RemoteChannel, but if it’s backed by a vector like the regular channel ti should be straightforward.

Thanks Fredik. It seems to be as easy as for example: jobs = RemoteChannel(() -> PriorityChannel(32));. I modified slightly the MWE in Distributed.RemoteChannel and it seems to work in julia 1.7.0-rc1:

using Distributed
addprocs(4); # add worker processes

@everywhere using PriorityChannels


const jobs = RemoteChannel(() -> PriorityChannel(32));
const results = RemoteChannel(() -> Channel{Tuple}(32));

@everywhere function do_work(jobs, results) # define work function everywhere
    while true
        job_id = take!(jobs)
        println("get $job_id with priority $job_id from jobs")
        exec_time = rand()
        sleep(exec_time) # simulates elapsed time doing actual work
        put!(results, (job_id, exec_time, myid()))
    end
end

function make_jobs(n)
    for i in 1:n
        e = rand(1:500)
        println("put job $e with priority $e in jobs")
        put!(jobs, e, e)
    end
end

n = 12;


# Need to sync before doing work to keep the order
@sync make_jobs(n); # feed the jobs channel with "n" jobs

for p in workers() # start tasks on the workers to process requests in parallel
    sleep(0.1) # needed a small sleep to sync the println in remote_do()  
    remote_do(do_work, p, jobs, results)
end

# Let them finish
sleep(3)

@elapsed while n > 0 # print out results
    job_id, exec_time, where = take!(results)
    println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
    global n = n - 1
end

rmprocs.(procs())

You are correct! Something is a bit fishy and not functioning properly. Seems like put!(jobs,e,e) hangs if jobs is completely emptied by the workers and I later try to add more jobs. If jobs are not emptied completely then adding more functions as expected. If I replace PriorityChannel with a Channel (i.e. the line const jobs = RemoteChannel(() -> Channel(Inf))) then workers can empty it completely and I can add more jobs with put!(jobs,e,e) as we go.

The functionality promised by this package is very useful. Does anybody know how to update it to work with the latest version of Julia?

Alternatively, can somebody who knows the Julia internals propose a workaround for Base.register_taskdone_hook that seems to be the point of incompatibility?

Alternatively, is there any other package that provides this functionality, a channel where the elements are associated with a priority value and get!'s always return the highest priority?

P.S. The original author @baggepinnen should update the package dependencies to restrict to the latest version of Julia that supports it.

2 Likes