Fault tolerant `pmap` when worker goes down

Is there a way to make pmap fault tolerant to procs going down? This example shows that the on_error option does not capture this:

julia> using Distributed                                                       
julia> addprocs(2)                                                
2-element Array{Int64,1}:                                        
julia> pmap(x->(sleep(10); 0), 1:3, on_error=x->-1)                        
      From worker 2:                                                                                                                                                    
      From worker 2:    signal (15): TerminatedWorker 2 terminated.
      From worker 2:    in expression starting at no file:0ERROR:                                                                                                       
ProcessExitedException()      From worker 2:    epoll_pwait at /usr/lib/libc.so.6 (unknown line)                                                                        

is triggered when killing one of the worker processes during the pmap.

This closed issue https://github.com/JuliaLang/julia/issues/217 suggests to me that it should work. Am I missing something?

Answering myself: use retry_delay as in pmap(x->(sleep(10); 0), 1:3, on_error=x->-1, retry_delays = zeros(3)). Then killing one proc makes another then do the work. And apparently, killing all worker-procs makes the process 1 do it.


According to the documentation of pmap:

Note that if both on_error and retry_delays are specified, the on_error hook is called before retrying. If on_error does not throw (or rethrow) an exception, the element will not be retried.

I think this contradicts your answer and would mean that in the call there would never be a retry attempt

I think the on_error is executed on the worker. Thus if the worker is down then it gets ignored. For a case like this, there is a difference:

ulia> pmap(x->(rand()>0.5 && error(); myid()), 1:3, on_error=x->error("dsfa"), retry_delays = zeros(3))
3-element Array{Int64,1}:

julia> pmap(x->(rand()>0.5 && error(); myid()), 1:3, on_error=x->error("dsfa"), retry_delays = zeros(3))
ERROR: dsfa
 [1] (::Base.var"#770#772")(::Task) at ./asyncmap.jl:178
 [2] foreach(::Base.var"#770#772", ::Array{Any,1}) at ./abstractarray.jl:2009
 [3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::UnitRange{Int64}) at ./asyncmap.jl:178
 [4] wrap_n_exec_twice(::Channel{Any}, ::Array{Any,1}, ::Distributed.var"#206#209"{WorkerPool}, ::Function, ::UnitRange{Int64}) at ./asyncmap.jl:154
 [5] async_usemap(::Base.var"#56#58"{Base.var"#56#57#59"{Array{Float64,1},Nothing,Distributed.var"#222#223"{Distributed.var"#190#192"{Distributed.var"#190#191#193"{WorkerPool,Distributed.var"#220#221"{Bool,var"#189#191",var"#190#192"}}}}}}, ::UnitRange{Int64}; ntasks::Function, batch_size::Nothing) at ./asyncmap.jl:103
 [6] #asyncmap#754 at ./asyncmap.jl:81 [inlined]
 [7] pmap(::Function, ::WorkerPool, ::UnitRange{Int64}; distributed::Bool, batch_size::Int64, on_error::Function, retry_delays::Array{Float64,1}, retry_check::Nothing) at /home/mauro/julia/julia-1.5/usr/share/julia/stdlib/v1.5/Distributed/src/pmap.jl:126
 [8] pmap(::Function, ::UnitRange{Int64}; kwargs::Base.Iterators.Pairs{Symbol,Any,Tuple{Symbol,Symbol},NamedTuple{(:on_error, :retry_delays),Tuple{var"#190#192",Array{Float64,1}}}}) at /home/mauro/julia/julia-1.5/usr/share/julia/stdlib/v1.5/Distributed/src/pmap.jl:156
 [9] top-level scope at REPL[37]:1

julia> pmap(x->(rand()>0.5 && error(); myid()), 1:3, on_error=x->-1, retry_delays = zeros(3))
3-element Array{Int64,1}:

Clearly in the last run, the function was not executed again. But for my example it is:

julia> pmap(x->(sleep(10); myid()), 1:3, on_error=x->-1, retry_delays = zeros(3))      From worker 17:
      From worker 17:   signal (15): Terminated
      From worker 17:   in expression starting at none:0
      From worker 17:   Allocations: 2560833 (Pool: 2560049; Big: 784); GC: 3
Worker 17 terminated.
3-element Array{Int64,1}:

Parallelism.jl has robust_pmap which is basically a precanned version of pmap with on_error set for various common errors, including worker death.
We use it alot to achieve optimal scaling for memory limitted problems.
Just start 1 worker per core an let the OOM killer kill the rest.
Works really well gets 90% memory utilization

There is this https://github.com/JuliaLang/julia/issues/36709 that i haven’t yet tracked down that occurs on occation. I have only seen it when it needs to kill 60+ out of 90 workers