Unexpected behaviour when interrupting pmap

I had a problem where I wanted to gracefully stop the execution of long running distributed jobs and return the data collected so far, and I ran into some behaviour for pmap that I found a bit odd.

A small example of a distributed job running 8 times over 2 workers, each run is 10 seconds and saves a value to the shared array when done.

using Distributed
rmprocs(workers())
addprocs(2)

function runtest(n)
    output_channel = RemoteChannel(() -> Channel{Vector{Int}}(1), 1)
    put!(output_channel, Int[])
    try
        pmap(1:n) do i
            @info "Starting i=$i"
            sleep(10)
            @info "Finished i=$i"
            output = take!(output_channel)
            push!(output, i)
            put!(output_channel, output)
        end
    catch e
        interrupt()
        if e isa InterruptException
            @info "Aborting run"
        else
            rethrow()
        end
    end
    output = take!(output_channel)
    close(output_channel)
    output
end

runtest(8)

When run and interrupted halfway through would look something like this.

julia> include("test.jl")
[ Info: Starting i=2
[ Info: Starting i=1
[ Info: Finished i=2
[ Info: Finished i=1
[ Info: Starting i=3
[ Info: Starting i=4
^C[ Info: Aborting run
2-element Vector{Int64}:
 2
 1

julia> [ Info: Starting i=5
[ Info: Starting i=6
[ Info: Finished i=5
[ Info: Finished i=6

So run 3 and 4 is interrupted, and never finishes, but 5 and 6 still starts and finishes, but 7 and 8 never starts.

I can fix this by adding another channel with a done value, something like this

using Distributed
rmprocs(workers())
addprocs(2)

function runtest(n)
    output_channel = RemoteChannel(() -> Channel{Vector{Int}}(1), 1)
    put!(output_channel, Int[])
    done_channel = RemoteChannel(() -> Channel{Bool}(1), 1)
    put!(done_channel, false)
    try
        pmap(1:n) do i
            isdone = take!(done_channel)
            put!(done_channel, isdone)
            isdone && return

            @info "Starting i=$i"
            sleep(10)
            @info "Finished i=$i"
            output = take!(output_channel)
            push!(output, i)
            put!(output_channel, output)
        end
    catch e
        take!(done_channel)
        put!(done_channel, true)
        interrupt()
        if e isa InterruptException
            @info "Aborting run"
        else
            rethrow()
        end
    end
    output = take!(output_channel)
    close(output_channel)
    close(done_channel)
    output
end

runtest(8)

which seems to do what I want.

I’m just a bit curious if the current behaviour is expected/intended since I would expect it to stop spawning new jobs after the main process has reached the catch clause, and it seems to stop for 7 and 8 but 5 and 6 are still started somehow?