I am having some practice with tasks, channels and conditions.
I would like to wait on a specific condition with a timeout. If the timeout ends, just put! a default value and continue, otherwise put! a specific value in the channel.
Instead of using timedwait, I decided to start simple by using wait.
The following code blocks, specifically at wait(cond) in the compute definition. Notify returns that all tasks were notified.
What am I doing wrong here?
function compute(id, chan, cond)
# status = timedwait(() -> wait(cond), 5)
# status == :timed_out ? put!(chan, (id,1)) : put!(chan, (id, 1 + 1))
wait(cond)
println("Am I blocked $i")
put!(chan, (id, 1))
println("Nope.")
end
function main()
doit = Condition()
chan = Channel{Tuple{Int,Int}}(10)
for i in 1:10
@async compute(i, chan, doit)
end
println("Let's notify the tasks.")
cnt = notify(doit)
println("Okay that happened for $cnt tasks.")
[take!(chan) for _ in 1:10]
end
julia> t
Task (failed) @0x000000013bfe58d0
UndefVarError: i not defined
compute(::Int64, ::Channel{Tuple{Int64,Int64}}, ::Base.GenericCondition{Base.AlwaysLockedST}) at /Users/paul/.julia/dev/tmp/wait.jl:5
(::var"#13#14"{Base.GenericCondition{Base.AlwaysLockedST},Channel{Tuple{Int64,Int64}}})() at ./task.jl:356
Let a task fail like any other program. But then you must capture the stacktrace. This is important when working with tasks.
Otherwise you are lost.
I did it like that:
t = Ref{Task}()
...
function main()
doit = Condition()
chan = Channel{Tuple{Int,Int}}(10)
global t = @async compute(1, chan, doit)
println("Let's notify the tasks.")
cnt = notify(doit)
println("Okay that happened for $cnt tasks.")
take!(chan)
end
then julia> main(),
then ^C,
then julia> t
This is sufficient in that case to get to the error.
I see, thank you for the advice. Do you happen to know a way to use timedwait together with a Condition or Event, so that a Task will wait for the event unless time runs out, in which case it does some default action?
function receive(ch::Channel, timeout::Real=5.0)
ev = Base.Event()
cb(timer) = notify(ev)
Timer(cb, timeout)
@async begin
if isready(ch)
notify(ev)
else
wait(ch)
notify(ev)
end
end
wait(ev)
isready(ch) ? take!(ch) : -9999
end
What I do here, is to have a condition or an event, then to start two tasks,
a timer and
a task doing some work (in this case it receives).
Whichever finishes first notifies the event.
Then you wait on the event. And after that you finish your work.
sorry, after looking at it again it doesn’t seem to work. If it has something in the channel it does not return immediately. I will look at it later and then post a working version.
It took another option that does not use a condition or event.
function compute(id, chan)
t = Task(() -> sleep(rand(1:10)))
schedule(t)
status = timedwait(() -> istaskdone(t), 5)
(status == :timed_out) ? put!(chan, (id,1)) : put!(chan, (id, 1 + 1))
end
function main()
chan = Channel{Tuple{Int, Int}}(10)
for i in 1:10
@async compute(i, chan)
end
[take!(chan) for _ in 1:10]
end
I remember that i did not go for timedwait() because it polls and it has the delay of the polling. Since I wanted to be more responsive, I figured out something else.