I use tasks to decouple subscribing to data streams and publish downstream data streams. I also use Bool references to coordinate when all tasks should exit.
Schematically, it looks like this:
# main thread
run_flag = Threads.Atomic{Bool}(true)
# task1, subscribe
try
while run_flag[]
for msg in producerthing
push!(channel, msg)
end
end
catch # if error set flag to end all other tasks
run_flag[] = false
end
# task2, worker
while run_flag[]
msg = take!(channel)
# do stuff
end
This kind of works.
However, it has at least one failure mode: If task1 dies and calls run_flag[] = false and channel happens to be empty, then task2 will be waiting on the channel forever and never check that the run_flag has been set to false.
Are thread-safe boolean flags the correct way to coordinate this work? If yes, how would you implement this idea correctly? If not, how would you do it instead?
When I did something similar a few years ago, I put a termination message (or rather one message per task) on the channel. Since the normal messages contained a positive integer, I could conveniently encode the termination messages with a negative value.
Imagine 5 tasks push!ing into the channel and one take!ing from it. If one of the pushing tasks crashes, how does it tell the other pushing tasks to exit? Or if I CTRL-C how do I coordinate that all tasks should exit? My design handles these cases (but has the same problematic edge case I described).
My case was certainly easier with a tree structure on the tasks and a dedicated channel for each group of workers.
What about both using an atomic Bool and sending a message to all downstream tasks when shutting down. It doesn’t even have to be a special message, just something to wake them up.
Yes, that’s what I do as you can see from my example. It has a problem which I describe and haven’t been able to solve yet. EDIT: I think the person I was responding to removed their message?
I also have a non-elegant solution, which is to close(channel). Then I need a try/except because take!ing from a closed channel triggered an InvalideStateException. It feels like a hack though.
I’m just an amateur at multi-task programming, so all I can say is that my approach worked very nicely for my use case. But I never had to catch any exceptions and all shutdown was initiated due to the work being completed.
I think this is likely the cleanest it can be with the current API. Unfortunately, take! doesn’t return a Union{T, ChannelClosed} or some other useful sum type that can communicate that the channel is closed intentionally.