Frustrating distributed task exception

Progress:   0%|▏                                        |  ETA: 1:48:49Worker 41 terminated.
Progress:  99%|████████████████████████████████████████▍|  ETA: 0:00:01ERROR: LoadError: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:322 [inlined]
 [2] fetch
   @ ./task.jl:337 [inlined]
 [3] preduce(reducer::Function, f::Function, R::UnitRange{Int64})
   @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:278
 [4] macro expansion
   @ ~/.julia/packages/ProgressMeter/Vf8un/src/ProgressMeter.jl:810 [inlined]
 [5] for_each_nmνQq(FBZdf::DataFrame, momentum_coupling_df::DataFrame, n0::Int64, m0::Int64, ν::Int64, Q::Int64, q::Int64, Qplusq::Int64, nValence0::Int64, nConduction0::Int64, skipped_band0::Int64, prefix::String)
   @ Main ~/src/exph.jl:77
 [6] ExPh(::String, ::String, ::String, ::String, ::String, ::Int64, ::Int64, ::String, ::Int64, ::Int64, ::Int64, ::Float64, ::String, ::Vararg{String, N} where N)
   @ Main ~/src/exph.jl:324
 [7] top-level scope
   @ ~/src/adphonon.jl:106

    nested task error: ProcessExitedException(41)
    Stacktrace:
      [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
        @ Base ./task.jl:710
      [2] wait
        @ ./task.jl:769 [inlined]
      [3] wait(c::Base.GenericCondition{ReentrantLock})
        @ Base ./condition.jl:106
      [4] take_buffered(c::Channel{Any})
        @ Base ./channels.jl:389
      [5] take!(c::Channel{Any})
        @ Base ./channels.jl:383
      [6] take!(::Distributed.RemoteValue)
        @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:599
      [7] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
        @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:390
      [8] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any, N} where N)
        @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:386
      [9] #remotecall_fetch#146
        @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
     [10] remotecall_fetch
        @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
     [11] (::Distributed.var"#157#158"{typeof(+), var"#88#123"{DataFrame, DataFrame, Int64, String, RemoteChannel{Channel{Bool}}}, UnitRange{Int64}, Vector{UnitRange{Int64}}, Int64, Int64})()
        @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:274
in expression starting at /public/home/mrkong/src/adphonon.jl:104
Progress:  99%|████████████████████████████████████████▌|  ETA: 0:00:01┌ Warning: Forcibly interrupting busy workers
│   exception = rmprocs: pids [4, 8, 9, 11, 14, 22, 24, 26, 27, 32, 36, 39, 42, 45, 47, 59, 63, 64, 67, 71, 78, 79, 81] not terminated after 5.0 seconds.
└ @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/cluster.jl:1242
┌ Warning: rmprocs: process 1 not removed
└ @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/cluster.jl:1038

Can you please post a Minimum Working Example (MWE) code so that we can try to figure out what’s going wrong?

Thanks for your willing to help. However, we find it difficult to minimize our script. Thus, we’ll try to debug it ourselves for now. Thanks again!
:kissing_heart: :kissing_heart:

Seems I am in a very similar shoe here. Got the error, yet hard to have a MWE. Want to learn more about your experience what helped. Thank you.

– update
I just read from @jpsamaroo that “There’s no explicit distribution of RAM with Julia or Distributed; memory stays on the node that allocates it unless you explicitly move it around.” Would would be a good pattern to adopt for writing @distributed for loop in case of memory outage?

You could use Dagger.jl to write the inner part of the loop. If a worker dies while running a task, the task is re-scheduled onto another (live) worker.

2 Likes

Thank you @jpsamaroo it worked out quite well. The kernel died and then the job is rescheduled. Much appreciated for the help.

1 Like

I found using @distributed with an aggregation method very helpful. An example would be like

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

after a careful read of Dagger.jl, I didn’t find a good equivalent, and it seems the way to do it is

nheads_array = fetch.([Dagger.spawn(i) do I
   Int(rand(Bool))
end for I in 1: 200000000])
nheads = reduce(+, nheads_array)

would doing so results in parallel lunch of 200000000 cores from Dagger? I remember that @distributed is smart about only launch cores when the resource become available, not sure if Dagger would do the same.
Please help advice what would be a better way to do this. Thank you.

You could do this, but it would have huge overhead and horrible performance, because Dagger.spawn is intended to be used with fewer, larger tasks. The cost of Int(rand(Bool)) is absolutely tiny, so you really want to combine these into a larger task.

At the moment, Dagger doesn’t have anything like this, so you’ll have to manually partition the loop into sufficiently large pieces (maybe try partitioning such that you have one partition per available CPU thread).