How to wait on a condition for tasks

I am having some practice with tasks, channels and conditions.

I would like to wait on a specific condition with a timeout. If the timeout ends, just put! a default value and continue, otherwise put! a specific value in the channel.

Instead of using timedwait, I decided to start simple by using wait.

The following code blocks, specifically at wait(cond) in the compute definition. Notify returns that all tasks were notified.

What am I doing wrong here?

function compute(id, chan, cond)
    # status = timedwait(() -> wait(cond), 5)
    # status == :timed_out ? put!(chan, (id,1)) : put!(chan, (id, 1 + 1))
    wait(cond)
    println("Am I blocked $i")
    put!(chan, (id, 1))
    println("Nope.")
end


function main()
    doit = Condition()
    chan = Channel{Tuple{Int,Int}}(10)

    for i in 1:10
        @async compute(i, chan, doit)
    end
    println("Let's notify the tasks.")
    cnt = notify(doit)
    println("Okay that happened for $cnt tasks.")
    [take!(chan) for _ in 1:10]
end
julia> t
Task (failed) @0x000000013bfe58d0
UndefVarError: i not defined
compute(::Int64, ::Channel{Tuple{Int64,Int64}}, ::Base.GenericCondition{Base.AlwaysLockedST}) at /Users/paul/.julia/dev/tmp/wait.jl:5
(::var"#13#14"{Base.GenericCondition{Base.AlwaysLockedST},Channel{Tuple{Int64,Int64}}})() at ./task.jl:356

It should read println("Am I blocked $id")

Whoops. The error was suppressed when calling it from main. Thanks for the help. Should i add a try...catch statement to capture this kind of errors?

Let a task fail like any other program. But then you must capture the stacktrace. This is important when working with tasks.
Otherwise you are lost.

I did it like that:

t = Ref{Task}()
...

function main()
    doit = Condition()
    chan = Channel{Tuple{Int,Int}}(10)

    global t = @async compute(1, chan, doit)
    println("Let's notify the tasks.")
    cnt = notify(doit)
    println("Okay that happened for $cnt tasks.")
    take!(chan)
end

then julia> main(),
then ^C,
then julia> t

This is sufficient in that case to get to the error.

1 Like

I see, thank you for the advice. Do you happen to know a way to use timedwait together with a Condition or Event, so that a Task will wait for the event unless time runs out, in which case it does some default action?

I tried using

status = timedwait(() -> wait(event), 1)

but it does not seem to work.

For me the following prototype worked:

function receive(ch::Channel, timeout::Real=5.0)
    ev = Base.Event()
    cb(timer) = notify(ev)
    Timer(cb, timeout)
    @async begin
        if isready(ch)
            notify(ev)
        else
            wait(ch)
            notify(ev)
        end
    end
    wait(ev)
    isready(ch) ? take!(ch) : -9999
end

What I do here, is to have a condition or an event, then to start two tasks,

  1. a timer and
  2. a task doing some work (in this case it receives).

Whichever finishes first notifies the event.

Then you wait on the event. And after that you finish your work.

By cb(timer) you mean ch(timer), the channel?

no, cb(timer) = notify(ev) is a function definition like here in the documentation.

You can also skip cb and simply write:

Timer(x->notify(ev), timeout)

1 Like

sorry, after looking at it again it doesn’t seem to work. If it has something in the channel it does not return immediately. I will look at it later and then post a working version. :face_with_raised_eyebrow:

I shouldn’t post prototypes. I’m sorry.

Now I posted a working version.

It took another option that does not use a condition or event.

function compute(id, chan)
    t = Task(() -> sleep(rand(1:10)))
    schedule(t)
    status = timedwait(() -> istaskdone(t), 5)
    (status == :timed_out) ? put!(chan, (id,1)) : put!(chan, (id, 1 + 1))
end


function main()
    chan = Channel{Tuple{Int, Int}}(10)

    for i in 1:10
        @async compute(i, chan)
    end
    [take!(chan) for _ in 1:10]
end
1 Like

ok, it works and it is the conventional solution.

I remember that i did not go for timedwait() because it polls and it has the delay of the polling. Since I wanted to be more responsive, I figured out something else.

1 Like