Interrupts in @async code

In the following piece of code, a lot of bookkeeping has to be done (the same kind of bookkeeping that Base.@sync does internally), but it allows providing the feature you want without the tasks being aware of each other (i.e. there is no need for the tasks to have a loop that explicitly checks for some ison[] variable)

module Interruptible
const EVENTS_VARNAME = gensym(:events)
const TASKS_VARNAME = gensym(:tasks)

macro sync(body)
    events = esc(EVENTS_VARNAME)
    tasks = esc(TASKS_VARNAME)
    quote
        let
            $events = Channel{Symbol}(2)
            $tasks = Task[]

            $(esc(body))

            for evt in $events
                if evt === :interrupted
                    for t in $tasks
                        istaskdone(t) || schedule(t, :stop, error=true)
                    end
                end

                all(istaskdone, $tasks) && break
            end

            e = CompositeException()
            for t in $tasks
                @assert istaskdone(t)
                istaskfailed(t) && push!(e, TaskFailedException(t))
            end

            isempty(e) || throw(e)
            nothing
        end
    end
end

macro async(name, body)
    name = esc(name)
    body = esc(body)
    events = esc(EVENTS_VARNAME)
    tasks = esc(TASKS_VARNAME)
    quote
        t = $Base.@async try
            $body
        catch e
            if e === :stop
                println($name * " was asked to stop")
            elseif e isa InterruptException
                println($name * " was interrupted")
                put!($events, :interrupted)
            end
            rethrow()
        finally
            println($name * " terminates")
            put!($events, :done)
        end
        push!($tasks, t)
    end
end

end

Example:

julia> Interruptible.@sync begin
           Interruptible.@async "A" begin
               for _ in 1:10
                   println("A is alive")
                   sleep(1.0)
               end
           end
       
           Interruptible.@async "B" begin
               for _ in 1:10
                   println("B is alive")
                   sleep(1.003)
               end
           end
       
           Interruptible.@async "C" begin
               # not a loop, but blocks
               readline()
           end
       end
A is alive
B is alive
^CB was interrupted
B terminates
C was asked to stop
C terminates
A was asked to stop
A terminates
ERROR: TaskFailedException
    nested task error: :stop
    [...]
...and 2 more exceptions.
3 Likes