I have a helper function heartbeat
in Julia that is responsible for pinging a remote process. Originally, it used @fetchfrom id myid()
to perform this action. However, I encountered an issue where if a worker disconnects, the @fetchfrom
call never finishes, causing the program to hang indefinitely.
To address this issue, someone suggested that I use Base.throwto
to kill the asynchronous task. I made modifications to the code to replace @fetchfrom id myid()
with a blocking I/O function called blocking_io()
for testing purposes.
Here’s the modified heartbeat function:
function heartbeat(id, onexit::Channel{Any}, delay=1, timeout=5)
result_channel = Channel{Any}(1)
running = true
handle = nothing
try
while running
handle = @async begin
# @fetchfrom id myid()
blocking_io()
put!(result_channel, id)
end
sleep(timeout) # wait for 5 sec
if isready(result_channel)
println("process $id is available")
take!(result_channel)
sleep(delay)
println("task done: " * string(istaskdone(handle))) # -> should be done
else
println("process $id is not available")
running = false
put!(onexit, 0)
Base.throwto(handle, InterruptException())
end
end
catch e
if e isa InterruptException
if !isnothing(handle)
println("InterruptException. task done: " * string(istaskdone(handle))) # -> should be done
end
end
finally
println("...end...")
end
end
Usage:
function test(id, delay=1, timeout=5)
onexit = Channel{Any}(1)
heartbeat(id, onexit, delay, timeout) # sync call
take!(onexit)
println("process $id has been removed")
end
When I call heartbeat
synchronously, the following lines are printed as expected:
process 1 is not available
InterruptException. task done: true
...end...
process 1 has been removed
However, if I call heartbeat
asynchronously:
function test(id, delay=1, timeout=5)
onexit = Channel{Any}(1)
handle = @async heartbeat(id, onexit, delay, timeout) # async call
take!(onexit)
wait(handle)
println("process $id has been removed")
end
only the following lines are printed:
process 1 is not available
and the program hangs forever.
Nothing is printed from the catch
and finally
blocks. Does this mean that the async task in heartbeat
is not killed and keeps running? Why the program is hanging if I call heartbeat
asynchronously?