Async Task Not Killed When Using Base.throwto to Interrupt Blocking I/O

I have a helper function heartbeat in Julia that is responsible for pinging a remote process. Originally, it used @fetchfrom id myid() to perform this action. However, I encountered an issue where if a worker disconnects, the @fetchfrom call never finishes, causing the program to hang indefinitely.

To address this issue, someone suggested that I use Base.throwto to kill the asynchronous task. I made modifications to the code to replace @fetchfrom id myid() with a blocking I/O function called blocking_io() for testing purposes.

Here’s the modified heartbeat function:

function heartbeat(id, onexit::Channel{Any}, delay=1, timeout=5)
    result_channel = Channel{Any}(1)
    running = true
    handle = nothing
    try
        while running
            handle = @async begin
                # @fetchfrom id myid()
                blocking_io()
                put!(result_channel, id)
            end
            sleep(timeout) # wait for 5 sec
            if isready(result_channel)
                println("process $id is available")
                take!(result_channel)
                sleep(delay)
                println("task done: " * string(istaskdone(handle))) # -> should be done
            else
                println("process $id is not available")
                running = false
                put!(onexit, 0)
                Base.throwto(handle, InterruptException())
            end
        end
    catch e
        if e isa InterruptException
            if !isnothing(handle)
                println("InterruptException. task done: " * string(istaskdone(handle))) # -> should be done
            end
        end
    finally
        println("...end...")
    end
end

Usage:

function test(id, delay=1, timeout=5)
    onexit = Channel{Any}(1)
    heartbeat(id, onexit, delay, timeout) # sync call
    take!(onexit)
    println("process $id has been removed")
end

When I call heartbeat synchronously, the following lines are printed as expected:

process 1 is not available
InterruptException. task done: true
...end...
process 1 has been removed

However, if I call heartbeat asynchronously:

function test(id, delay=1, timeout=5)
    onexit = Channel{Any}(1)
    handle = @async heartbeat(id, onexit, delay, timeout) # async call
    take!(onexit)
    wait(handle)
    println("process $id has been removed")
end

only the following lines are printed:

process 1 is not available

and the program hangs forever.

Nothing is printed from the catch and finally blocks. Does this mean that the async task in heartbeat is not killed and keeps running? Why the program is hanging if I call heartbeat asynchronously?

1 Like

I’ve found a workaround using the channel for IO task. _heartbeat function puts a task ref to onexit channel and then heartbeat wrapper cancels the task:

function blocking_io()
    while true
        sleep(1)
    end
end

function canceltask(task::Task)
    if !isnothing(task)
        try
            Base.throwto(task, InterruptException())
        catch
        end
    end
end

function _heartbeat(id, onexit::Channel{Task}, delay, timeout)
    result_channel = Channel{Any}(1)
    running = true
    iotask = nothing
    try
        while running
            iotask = @async begin
                # @fetchfrom id myid()
                blocking_io()
                put!(result_channel, id)
            end
            sleep(timeout)
            if isready(result_channel)
                println("process $id is available")
                take!(result_channel)
                sleep(delay)
                println("ping task is finished: " * string(istaskdone(ping_task))) # => false
            else
                println("process $id is not available")
                running = false
            end
        end
    finally
        put!(onexit, iotask)
    end
end


function heartbeat(id, delay=1, timeout=5)
    onexit = Channel{Task}(1)
    #@async _heartbeat(id, onexit, delay, timeout) 
    _heartbeat(id, onexit, delay, timeout) # also works
    println("before cancel")
    heartbeat_task = take!(onexit)
    println(heartbeat_task)
    canceltask(heartbeat_task)
    println("heartbeat task is finished: " * string(istaskdone(heartbeat_task))) # => true
    println("process $id has been removed")
end

While I’m uncertain if this approach is optimal, it effectively functions with both synchronous and asynchronous models without causing any hang-ups.

1 Like

Using throwto introduces a data race into your code, which can cause significant corruption of the state of your program. It is best avoided under (almost) all circumstances (except those where you want to crash the code because things are going very badly already and don’t care if they get worse). If you can, call close on the object it is using instead (be it a Channel or a Socket or a file or a Timer, etc)