How to wait on a condition for tasks

I am having some practice with tasks, channels and conditions.

I would like to wait on a specific condition with a timeout. If the timeout ends, just put! a default value and continue, otherwise put! a specific value in the channel.

Instead of using timedwait, I decided to start simple by using wait.

The following code blocks, specifically at wait(cond) in the compute definition. Notify returns that all tasks were notified.

What am I doing wrong here?

function compute(id, chan, cond)
    # status = timedwait(() -> wait(cond), 5)
    # status == :timed_out ? put!(chan, (id,1)) : put!(chan, (id, 1 + 1))
    wait(cond)
    println("Am I blocked $i")
    put!(chan, (id, 1))
    println("Nope.")
end


function main()
    doit = Condition()
    chan = Channel{Tuple{Int,Int}}(10)

    for i in 1:10
        @async compute(i, chan, doit)
    end
    println("Let's notify the tasks.")
    cnt = notify(doit)
    println("Okay that happened for $cnt tasks.")
    [take!(chan) for _ in 1:10]
end
julia> t
Task (failed) @0x000000013bfe58d0
UndefVarError: i not defined
compute(::Int64, ::Channel{Tuple{Int64,Int64}}, ::Base.GenericCondition{Base.AlwaysLockedST}) at /Users/paul/.julia/dev/tmp/wait.jl:5
(::var"#13#14"{Base.GenericCondition{Base.AlwaysLockedST},Channel{Tuple{Int64,Int64}}})() at ./task.jl:356

It should read println("Am I blocked $id")

Whoops. The error was suppressed when calling it from main. Thanks for the help. Should i add a try...catch statement to capture this kind of errors?

Let a task fail like any other program. But then you must capture the stacktrace. This is important when working with tasks.
Otherwise you are lost.

I did it like that:

t = Ref{Task}()
...

function main()
    doit = Condition()
    chan = Channel{Tuple{Int,Int}}(10)

    global t = @async compute(1, chan, doit)
    println("Let's notify the tasks.")
    cnt = notify(doit)
    println("Okay that happened for $cnt tasks.")
    take!(chan)
end

then julia> main(),
then ^C,
then julia> t

This is sufficient in that case to get to the error.

1 Like

I see, thank you for the advice. Do you happen to know a way to use timedwait together with a Condition or Event, so that a Task will wait for the event unless time runs out, in which case it does some default action?

I tried using

status = timedwait(() -> wait(event), 1)

but it does not seem to work.

For me the following prototype worked:

function receive(ch::Channel, timeout::Real=5.0)
    ev = Base.Event()
    cb(timer) = notify(ev)
    Timer(cb, timeout)
    @async begin
        if isready(ch)
            notify(ev)
        else
            wait(ch)
            notify(ev)
        end
    end
    wait(ev)
    isready(ch) ? take!(ch) : -9999
end

What I do here, is to have a condition or an event, then to start two tasks,

  1. a timer and
  2. a task doing some work (in this case it receives).

Whichever finishes first notifies the event.

Then you wait on the event. And after that you finish your work.

1 Like

By cb(timer) you mean ch(timer), the channel?

no, cb(timer) = notify(ev) is a function definition like here in the documentation.

You can also skip cb and simply write:

Timer(x->notify(ev), timeout)

1 Like

sorry, after looking at it again it doesn’t seem to work. If it has something in the channel it does not return immediately. I will look at it later and then post a working version. :face_with_raised_eyebrow:

I shouldn’t post prototypes. I’m sorry.

Now I posted a working version.

It took another option that does not use a condition or event.

function compute(id, chan)
    t = Task(() -> sleep(rand(1:10)))
    schedule(t)
    status = timedwait(() -> istaskdone(t), 5)
    (status == :timed_out) ? put!(chan, (id,1)) : put!(chan, (id, 1 + 1))
end


function main()
    chan = Channel{Tuple{Int, Int}}(10)

    for i in 1:10
        @async compute(i, chan)
    end
    [take!(chan) for _ in 1:10]
end
1 Like

ok, it works and it is the conventional solution.

I remember that i did not go for timedwait() because it polls and it has the delay of the polling. Since I wanted to be more responsive, I figured out something else.

1 Like