How to wait for a thread to complete

Hi all,

I have a situation where I receive a message via ZMQ and would like to perform post processing of the reply on a separate thread and then send a reply from the main thread once the post processing is complete. I am having trouble waiting for the post processing to finish before replying. Here is a simple example:

using Base.Threads: @spawn, threadid

function post_process(message, cond)
    lock(cond)
    try
        println("post processing $message on thread $(threadid())")
        # simulated processing
        sleep(2)
        result = :outgoing_message
        notify(cond, result)
        println("post processing complete on thread $(threadid())")
    finally
        unlock(cond)
    end
end

function reply(c)
    lock(c)
    try 
        while true 
            result = wait(c)
            println("reply $(result) on thread $(threadid())")
        end
    finally 
        unlock(c)
    end
end

cond = Threads.Condition()

# run this once to wait for post_process
@async reply(cond)

message1 = " my received message 1"
# process message
@spawn post_process(message1, cond)

message2 = " my received message 2"
# process message
@spawn post_process(message2, cond)

When I post process the messages manually one at a time, reply responds correctly. However, if I run them together, reply only responds once. Is there some sort of buffering I need to handle? If so, how would I do that? Or is there a different problem?

I think you’d be better off using a Channel (rather than a Condition) for this:

using Base.Threads: @spawn, threadid

function post_process(message, chan)
    println("post processing $message on thread $(threadid())")
    # simulated processing
    sleep(2)
    result = :outgoing_message
    put!(chan, result)
    println("post processing complete on thread $(threadid())")
end

function reply(chan)
    while true
        result = take!(chan)
        println("reply $(result) on thread $(threadid())")
    end
end

chan = Channel{Symbol}(32)

# run this once to wait for post_process
task = @async reply(chan)

message1 = " my received message 1"
# process message
@spawn post_process(message1, chan)

message2 = " my received message 2"
# process message
@spawn post_process(message2, chan)

wait(task)
$ julia -t 4 foo.jl 
post processing  my received message 1 on thread 2
post processing  my received message 2 on thread 3
reply outgoing_message on thread 1
reply outgoing_message on thread 1
post processing complete on thread 2
post processing complete on thread 3
^C
2 Likes

Thank you!

I actually came to the same conclusion. I do have two related questions if you or anyone else is willing to answer. First, in my solution for reply, why does the for loop remain active?

function reply(results)
    for r in results
        println(r)
    end
end

The while true loop makes more sense in your solution.

Second, is there any general advice for knowing when to use Channel versus Condition?

Iteration on the channel will stop when the channel is closed. Such a solution is actually a great way to ask the reply task to gracefully terminate: you simply close the channel when the whole process is done, and your asynchronous task will terminate.

I think the Condition documentation is particularly relevant here (emphasis mine):

Create an edge-triggered event source that tasks can wait for. Tasks that call wait on a Condition are suspended and queued. Tasks are woken up when notify is later called on the Condition . Edge triggering means that only tasks waiting at the time notify is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. The Channel and Threads.Event types do this, and can be used for level-triggered events.

IIUC, the problem you had in your Condition-based version was related to this: your reply task was initially waiting, and was woken up by the first event (when the first post-processing was done). Then, while the reply task was awake, the second event would be sent, which would have no effect since you can’t wake up a task that’s already active.
So I’d say that if you need the waiting task to be woken up exactly as many times as there were events sent, then you should favor Channel over Condition.

2 Likes