I’ve created a small package PriorityChannels.jl that implements a PriorityChannel
that mimics Base.Channel
, but where each element is associated with a priority. take!
always returns the highest priority element. Internally, a heap is used to keep track of priorities. Usage example in the readme.
Difference between Channel
and PriorityChannel
-
put!(pc, element, priority::Real)
lower number indicates a higher priority (default = 0).
-
PriorityChannel
can not be unbuffered (of length 0) and must have a positive length.
-
take!(pc)
returns the highest priority item, PriorityChannel
thus acts like a priority queue instead of a FIFO queue like Channel
does
- Pretty much all other functionality should be the same, including all constructors.
Installation
using Pkg
pkg"add PriorityChannels"
Happy New Year, may it be full of wisely prioritized and rewarding tasks!
17 Likes
Thank you Fredrik for this package! I’ve been looking for this functionality for some time (RemoteChannel with priority) but did not find anything. Do you think this priority functionality can easily be extended to work for Distributed.RemoteChannel
as well? That would be very handy.
Hello Jesper! This package used some Julia internals that have since changed and this package requires some updates to work even with the standard Channel type. I haven’t looked deeply into RemoteChannel, but if it’s backed by a vector like the regular channel ti should be straightforward.
Thanks Fredik. It seems to be as easy as for example: jobs = RemoteChannel(() -> PriorityChannel(32));
. I modified slightly the MWE in Distributed.RemoteChannel and it seems to work in julia 1.7.0-rc1:
using Distributed
addprocs(4); # add worker processes
@everywhere using PriorityChannels
const jobs = RemoteChannel(() -> PriorityChannel(32));
const results = RemoteChannel(() -> Channel{Tuple}(32));
@everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
println("get $job_id with priority $job_id from jobs")
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
function make_jobs(n)
for i in 1:n
e = rand(1:500)
println("put job $e with priority $e in jobs")
put!(jobs, e, e)
end
end
n = 12;
# Need to sync before doing work to keep the order
@sync make_jobs(n); # feed the jobs channel with "n" jobs
for p in workers() # start tasks on the workers to process requests in parallel
sleep(0.1) # needed a small sleep to sync the println in remote_do()
remote_do(do_work, p, jobs, results)
end
# Let them finish
sleep(3)
@elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
rmprocs.(procs())
You are correct! Something is a bit fishy and not functioning properly. Seems like put!(jobs,e,e)
hangs if jobs
is completely emptied by the workers and I later try to add more jobs. If jobs
are not emptied completely then adding more functions as expected. If I replace PriorityChannel
with a Channel
(i.e. the line const jobs = RemoteChannel(() -> Channel(Inf))
) then workers can empty it completely and I can add more jobs with put!(jobs,e,e)
as we go.
The functionality promised by this package is very useful. Does anybody know how to update it to work with the latest version of Julia?
Alternatively, can somebody who knows the Julia internals propose a workaround for Base.register_taskdone_hook
that seems to be the point of incompatibility?
Alternatively, is there any other package that provides this functionality, a channel where the elements are associated with a priority value and get!
's always return the highest priority?
P.S. The original author @baggepinnen should update the package dependencies to restrict to the latest version of Julia that supports it.
2 Likes