Hi all,
I have a situation where I receive a message via ZMQ and would like to perform post processing of the reply on a separate thread and then send a reply from the main thread once the post processing is complete. I am having trouble waiting for the post processing to finish before replying. Here is a simple example:
using Base.Threads: @spawn, threadid
function post_process(message, cond)
lock(cond)
try
println("post processing $message on thread $(threadid())")
# simulated processing
sleep(2)
result = :outgoing_message
notify(cond, result)
println("post processing complete on thread $(threadid())")
finally
unlock(cond)
end
end
function reply(c)
lock(c)
try
while true
result = wait(c)
println("reply $(result) on thread $(threadid())")
end
finally
unlock(c)
end
end
cond = Threads.Condition()
# run this once to wait for post_process
@async reply(cond)
message1 = " my received message 1"
# process message
@spawn post_process(message1, cond)
message2 = " my received message 2"
# process message
@spawn post_process(message2, cond)
When I post process the messages manually one at a time, reply responds correctly. However, if I run them together, reply only responds once. Is there some sort of buffering I need to handle? If so, how would I do that? Or is there a different problem?
I think you’d be better off using a Channel
(rather than a Condition
) for this:
using Base.Threads: @spawn, threadid
function post_process(message, chan)
println("post processing $message on thread $(threadid())")
# simulated processing
sleep(2)
result = :outgoing_message
put!(chan, result)
println("post processing complete on thread $(threadid())")
end
function reply(chan)
while true
result = take!(chan)
println("reply $(result) on thread $(threadid())")
end
end
chan = Channel{Symbol}(32)
# run this once to wait for post_process
task = @async reply(chan)
message1 = " my received message 1"
# process message
@spawn post_process(message1, chan)
message2 = " my received message 2"
# process message
@spawn post_process(message2, chan)
wait(task)
$ julia -t 4 foo.jl
post processing my received message 1 on thread 2
post processing my received message 2 on thread 3
reply outgoing_message on thread 1
reply outgoing_message on thread 1
post processing complete on thread 2
post processing complete on thread 3
^C
2 Likes
Thank you!
I actually came to the same conclusion. I do have two related questions if you or anyone else is willing to answer. First, in my solution for reply
, why does the for loop remain active?
function reply(results)
for r in results
println(r)
end
end
The while true
loop makes more sense in your solution.
Second, is there any general advice for knowing when to use Channel
versus Condition
?
Iteration on the channel will stop when the channel is closed. Such a solution is actually a great way to ask the reply
task to gracefully terminate: you simply close the channel when the whole process is done, and your asynchronous task will terminate.
I think the Condition
documentation is particularly relevant here (emphasis mine):
Create an edge-triggered event source that tasks can wait for. Tasks that call wait
on a Condition
are suspended and queued. Tasks are woken up when notify
is later called on the Condition
. Edge triggering means that only tasks waiting at the time notify
is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. The Channel
and Threads.Event
types do this, and can be used for level-triggered events.
IIUC, the problem you had in your Condition
-based version was related to this: your reply
task was initially waiting, and was woken up by the first event (when the first post-processing was done). Then, while the reply task was awake, the second event would be sent, which would have no effect since you can’t wake up a task that’s already active.
So I’d say that if you need the waiting task to be woken up exactly as many times as there were events sent, then you should favor Channel
over Condition
.
2 Likes