Interrupts in @async code

I’m trying to wrap my head around how Julia handles interrupts in @async code. I have three different scripts that I’m trying to interrupt using ctrl+c, with various degrees of success.

Script1:

julia> @async begin
           for _ = 1:10
               println("Hey apple!")
               sleep(0.5)
           end
       end

This cannot be interrupted, instead it prints “Hey apple” ten times regardless of how many times I hit ctrl+c.

Script 2

julia> @sync begin
           @async begin
               for _ = 1:10
                   println("Hey apple!")
                   sleep(0.5)
               end
           end
       end

This terminates the first time I hit ctrl+c.

Script 3

julia> @sync begin
           @async begin
               for _ = 1:10
                   println("Hey apple!")
                   sleep(0.5)
               end
           end
           @async begin
               for _ = 1:10
                   println("What?")
                   sleep(1.0)
               end
           end
       end

Hitting ctrl+c on this terminates the “Hey apple!” task, but doesn’t stop the “What?” task.


The behaviour of scripts 1 and 2 seems somewhat sensible, but the behaviour of script 3 feels like a bug. Am I missing something?

In this particular case I don’t see any issue related to the handling of the interrupt generated by Ctrl-C.

The thing to keep in mind is that the InterruptException is delivered only to one concurrent task, it is not broadcasted to all coroutines.

In Script1 Ctrl-C delivers the InterruptException to the REPL interpreter: there is no active computation in the REPL and nothing happens.
The @async task continues until normal termination.

In Script2 Ctrl-C delivers an InterruptException to the @sync block and precisely to the only async
task that it wraps: this terminate the @sync block because no other @async tasks are to be waited.

In Script3 Ctrl-C delivers an InterruptException to the @sync computation that delivers the exception to the first @async task, but the second @async task continues until normal termination.

In the latter case the @sync block ends when all @async tasks have been completed and if any @async task is terminated by an exception
then a composite exception (a list of all exceptions thrown by errored async tasks) is reported by the sync block when it terminates.
In this case the composite exception will be a list containing a single InterruptException.

julia> @sync begin
                  @async begin
                      for _ = 1:10
                          println("Hey apple!")
                          sleep(0.5)
                      end
                  end
                  @async begin
                      for _ = 1:10
                          println("What?")
                          sleep(1.0)
                      end
                  end
              end
Hey apple!
What?
Hey apple!
What?
Hey apple!
^CWhat?
What?
What?
What?
What?
What?
What?
What?
ERROR: TaskFailedException

    nested task error: InterruptException:
    Stacktrace:
     [1] poptask(W::Base.InvasiveLinkedListSynchronized{Task})
       @ Base ./task.jl:921
     [2] wait()
       @ Base ./task.jl:930
     [3] wait(c::Base.GenericCondition{Base.Threads.SpinLock})
       @ Base ./condition.jl:124
     [4] _trywait(t::Timer)
       @ Base ./asyncevent.jl:138
     [5] wait
       @ ./asyncevent.jl:155 [inlined]
     [6] sleep(sec::Float64)
       @ Base ./asyncevent.jl:240
     [7] macro expansion
       @ ./REPL[3]:5 [inlined]
     [8] (::var"#3#5")()
       @ Main ./task.jl:484
Stacktrace:
 [1] sync_end(c::Channel{Any})
   @ Base ./task.jl:436
 [2] top-level scope
   @ task.jl:455

1 Like

That explains the observations, but I still think the behavior of script 3 is somewhat peculiar. I can see that sometimes you may want to have some sort of demon task that shouldn’t be interrupted by normal user interaction, so I’m fine with script 1 behaving the way it does. I can’t imagine a use case where you’d want to interrupt only a single out of many seemingly equivalent tasks, though. I therefore still feel like interrupting an @sync should interrupt all tasks contained within it (or else not interrupt any task, but then it would be nice if there was another macro for this purpose).

1 Like

I therefore still feel like interrupting an @sync should interrupt all tasks contained within it (or else not interrupt any task, but then it would be nice if there was another macro for this purpose).

:+1:

Is there a proper way to do this currently? (i.e. have a bunch of async tasks which should all be interrupted simultaneously upon pressing CTRL+C once)

Please be aware that the current recommendations are to avoid @async in favour of @spawn:

It is strongly encouraged to favor Threads.@spawn over @async always even when no parallelism is required especially in publicly distributed libraries. This is because a use of @async disables the migration of the parent task across worker threads in the current implementation of Julia. Thus, seemingly innocent use of @async in a library function can have a large impact on the performance of very different parts of user applications.

https://docs.julialang.org/en/v1/base/parallel/#Base.@async

1 Like

thanks, I still have the same question with a bunch of @spawned tasks which should be interrupted simultaneously though.

My post was intended to address the @async usage in general (OP-related).

To answer your question: if you are not in a context where data races can happen, you could pass an ison = Ref{Bool}(true) to all your tasks (at the spawning time) and pause all the tasks at once by doing ison[] = false.

Interesting! I guess the intention behind this is that @async should be “just” concurrent and therefore race-condition-free while Threads.@spawn should be truly parallel but in return requires synchronisation?

And if data races can happen, you can use Threads.Atomic{Bool} in the same way. It’s just a little annoying, because it means that if you want to be truly sure that one task exiting kills all the others, then you have to manually wrap all your @async bodies in try / finally blocks and do all the low-level book-keeping yourself. I suspect there is scope for some nice higher-level primitives here.

2 Likes

Happy to move this to another issue if this is too far from OP but (and I may be a bit dense) I haven’t managed to get code working to interrupt a bunch of tasks simultaneously upon pressing CTRL+C. Forgetting about data races for now:

@sync begin
  ison = Ref(true)
  @async begin
    try
      while ison[]
        sleep(0.1)
      end
    finally
      # we don't seem to ever get here with CTRL+C
      println("interrupting everything?")
      ison[] = false
    end
  end
  @async begin
    for _ in 1:50
      println("a")
      ison[] || break
      sleep(0.1)
    end
  end
  @async begin
    for _ in 1:50
      println("b")
      ison[] || break
      sleep(0.1)
    end
  end
end

running this code and hitting CTRL+C won’t interrupt the tasks (same story with @spawn).

For me, ctrl+C seems to consistently interrupt the “b” task. So I guess which task gets interrupted is not deterministic, and to be sure you’re catching the interrupt, you have to wrap everything in try / finally clauses. Technically even the “main” task (the one which spawns all the @async), but you can’t do that because in your example the main task hangs inside the @sync, and you can’t add a try / finally there. You could move the code of one sub-task into the main task, though, and then you’d most likely be fine again.

1 Like

Thanks, indeed this (which adds in other recommendations from above) seems to work:

using Base.Threads: @spawn
using Base.Threads: Atomic, atomic_xchg!

@sync begin
    ison = Atomic{Bool}(true)
    @spawn begin
        for _ in 1:50
            println("a")
            ison[] || (println("a interrupted"); break)
            sleep(0.1)
        end
    end
    @spawn begin
        for _ in 1:50
            println("b")
            ison[] || (println("b interrupted"); break)
            sleep(0.1)
        end
    end
    try
        while true
            sleep(0.1)
        end
    catch
        atomic_xchg!(ison, false)
    end
end
a
b
a
b
a
b
a
b
a
b
a
^C
b
b interrupted
a
a interrupted

If you only modify your switch from the main thread (on/off all your tasks using it) then there is no danger - regardless of the tasks running on multiple threads (e.g., you only perform read operations from inside the tasks).

However, a safer (and also thread-safe) way to communicate with your tasks is by using the Channel approach. You can consume multiple channels from one task or have multiple tasks using a single channel.

Now - specifically related to CTRL+C - that shortcut has a specific functionality defined in the REPL context.

However - catching the specific keystroke combination and reacting to it would be a subject better fit for another topic (I suggest first doing a search and see if such a thing was not already discussed).

One important thing: I am pretty confident that the listener for CTRL+C will have to run on its own task. So, at this point, it is pretty clear that the best solution for your use case will include the usage of Channel. More specifically - you’ll need to write on the channel from your “keystroke listener” task and use fetch inside the tasks you want to pause/stop (take! would remove the value from the channel and only the first task that consumed the value would be paused). There are many ways to design this - but this is the gist.

1 Like

Thanks @algunion, when I read answers like yours, though I think I understand the gist, I look forward to the good soul who’ll write an entry-level blog post on how to use co-routines / threads effectively in Julia as I personally find the Julia doc to be insufficient to get how to do the kind of stuff you’re talking about short of doing a lot of trial and error.

2 Likes

This looks good to me except for two issues:

  • I’m not sure whether putting a try / catch only in the main task is enough. AFAIK, it is unspecified which task receives the interrupt, so the safest thing to do is to put try / catch around all task bodies.
  • atomic_xchg! is intended for situations when you want to set the value of an atomic variable while at the same time reading out the old value. In this case, you don’t need this. All we want to do here is update the variable in a way which is safe even when another thread might be reading it. You can do this using ison[] = true. This one is just a tiny nitpick, though. You can of course use atomic_xchg!() wherever you use var[] = new_val, it’s just potentially confusing to people reading your code, and you might make your machine work harder than necessary.
1 Like

Thanks,

I tried to do what I think you suggested and observed that it hangs upon CTRL+C:

using Base.Threads: @spawn
using Base.Threads: Atomic

@sync begin
    ison = Atomic{Bool}(true)
    @spawn begin
        try
          for _ in 1:50
              println("a")
              ison[] || (println("a interrupted"); break)
              sleep(0.1)
          end
        catch
          ison[] = false
        end
    end
    @spawn begin
        try
          for _ in 1:50
              println("b")
              ison[] || (println("b interrupted"); break)
              sleep(0.1)
          end
        catch
          ison[] = false
        end
    end
    try
        while true
            sleep(0.1)
        end
    catch
        ison[] = false
    end
end

I get (note the double CTRL+C):

a
b
a
b
a
b
^Ca
a interrupted  # here it hangs, I have to hit CTRL+C again to get out
^Cfalse

If someone wants to help me get [RFC] signal handling: User-defined interrupt handlers by jpsamaroo · Pull Request #49541 · JuliaLang/julia · GitHub finished, this would provide for deterministic and reliable interrupt behavior.

2 Likes

This is because of the infinite loop in @sync (i.e. loops in the tasks correctly terminate when !ison[] , but this is not the case of the loop at the top level in @sync).

1 Like

thanks @ffevotte; I must admit I don’t fully get why the infinite loop does not get broken but indeed replacing while true by while ison[] removes the hanging and interrupts everything properly.
Curiously though now the "b interrupted" does not get printed (there’s probably a good reason for it but I find this stuff hard to reason about…).

using Base.Threads: @spawn
using Base.Threads: Atomic

@sync begin
    ison = Atomic{Bool}(true)
    @spawn begin
        try
          for _ in 1:50
              println("a")
              ison[] || (println("a interrupted"); break)
              sleep(0.1)
          end
        catch
          ison[] = false
        end
    end
    @spawn begin
        try
          for _ in 1:50
              println("b")
              ison[] || (println("b interrupted"); break)
              sleep(0.1)
          end
        catch
          ison[] = false
        end
    end
    try
        while ison[]
            sleep(0.1)
        end
    catch
        ison[] = false
    end
end

a
b
a
b
a
b
a
b
^Ca
a interrupted

Something like this might explain more clearly how things unfold:

@sync begin
    ison = Atomic{Bool}(true)
    @spawn begin
        try
            for _ in 1:50
                println("a")
                ison[] || (println("a stops"); break)
                sleep(0.1)
            end
        catch
            println("a got interrupted")
            ison[] = false
        end
    end
    @spawn begin
        try
            for _ in 1:50
                println("b")
                ison[] || (println("b stops"); break)
                sleep(0.1)
            end
        catch
            println("b got interrupted")
            ison[] = false
        end
    end
    try
        while true
            ison[] || (println("main loop stops"); break)
            sleep(0.1)
        end
    catch
        println("top-level loop got interrupted")
        ison[] = false
    end
end
  1. “b” catches the interruption:
    • it exits its loop immediately via an error path
    • it sets ison[] to false
    • it terminates
  2. at its next iteration, the loop in “a” sees that !ison[]
    • it breaks out of its loop via a standard code path
    • it terminates
  3. same thing for the top level loop in @sync

(2. & 3. happen “at the same time”)

2 Likes