I had a problem where I wanted to gracefully stop the execution of long running distributed jobs and return the data collected so far, and I ran into some behaviour for pmap
that I found a bit odd.
A small example of a distributed job running 8 times over 2 workers, each run is 10 seconds and saves a value to the shared array when done.
using Distributed
rmprocs(workers())
addprocs(2)
function runtest(n)
output_channel = RemoteChannel(() -> Channel{Vector{Int}}(1), 1)
put!(output_channel, Int[])
try
pmap(1:n) do i
@info "Starting i=$i"
sleep(10)
@info "Finished i=$i"
output = take!(output_channel)
push!(output, i)
put!(output_channel, output)
end
catch e
interrupt()
if e isa InterruptException
@info "Aborting run"
else
rethrow()
end
end
output = take!(output_channel)
close(output_channel)
output
end
runtest(8)
When run and interrupted halfway through would look something like this.
julia> include("test.jl")
[ Info: Starting i=2
[ Info: Starting i=1
[ Info: Finished i=2
[ Info: Finished i=1
[ Info: Starting i=3
[ Info: Starting i=4
^C[ Info: Aborting run
2-element Vector{Int64}:
2
1
julia> [ Info: Starting i=5
[ Info: Starting i=6
[ Info: Finished i=5
[ Info: Finished i=6
So run 3 and 4 is interrupted, and never finishes, but 5 and 6 still starts and finishes, but 7 and 8 never starts.
I can fix this by adding another channel with a done value, something like this
using Distributed
rmprocs(workers())
addprocs(2)
function runtest(n)
output_channel = RemoteChannel(() -> Channel{Vector{Int}}(1), 1)
put!(output_channel, Int[])
done_channel = RemoteChannel(() -> Channel{Bool}(1), 1)
put!(done_channel, false)
try
pmap(1:n) do i
isdone = take!(done_channel)
put!(done_channel, isdone)
isdone && return
@info "Starting i=$i"
sleep(10)
@info "Finished i=$i"
output = take!(output_channel)
push!(output, i)
put!(output_channel, output)
end
catch e
take!(done_channel)
put!(done_channel, true)
interrupt()
if e isa InterruptException
@info "Aborting run"
else
rethrow()
end
end
output = take!(output_channel)
close(output_channel)
close(done_channel)
output
end
runtest(8)
which seems to do what I want.
I’m just a bit curious if the current behaviour is expected/intended since I would expect it to stop spawning new jobs after the main process has reached the catch clause, and it seems to stop for 7 and 8 but 5 and 6 are still started somehow?