What makes a worker terminate on a cluster?

Hi, I am using the Julia language for calculations on a cluster. Typically, I define a few sharedarrays in the beginning of a script, and then do a
@sync @distributed for over say 32 cores of a node to fill it (mainly using QuantumOptics.jl which heavily relies on DifferentialEquations.jl–not sure if relevant).
However, sometimes it seems to crash by workers terminating in a way that is rather unpredictable. Often, the same code runs fine with some parameters changed that would make e.g. the time-evolution studied, and thus the computing time, shorter. Mostly, these problems occur especially when the requested walltime is quite long, but it is hard to discern a pattern.

This is a typical example of the stacktrace in the .o file

    From worker 19:	35 finished #requested println output
      From worker 28:	45 finished
      From worker 28:	IOError: stream is closed or unusable
      From worker 28:	Stacktrace:
      From worker 28:	 [1] check_open at ./stream.jl:328 [inlined]
      From worker 28:	 [2] uv_write_async(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:959
      From worker 28:	 [3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:922
      From worker 28:	 [4] uv_write at ./stream.jl:918 [inlined]
      From worker 28:	 [5] flush(::Sockets.TCPSocket) at ./stream.jl:1014
      From worker 28:	 [6] send_msg_(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.RemoteDoMsg, ::Bool) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:191
      From worker 28:	 [7] send_msg(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.RemoteDoMsg) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:134
      From worker 28:	 [8] #remote_do#153 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:461 [inlined]
      From worker 28:	 [9] remote_do at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:461 [inlined]
      From worker 28:	 [10] flush_gc_msgs(::Distributed.Worker) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:155
      From worker 28:	 [11] flush_gc_msgs() at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:202
      From worker 28:	 [12] macro expansion at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:254 [inlined]
      From worker 31:	48 finished
      From worker 31:	IOError: stream is closed or unusable
      From worker 31:	Stacktrace: #the same one repeats

And in the .e file

+ julia -p 32 myfile.jl
Worker 12 terminated.
Worker 2 terminated.
Worker 11 terminated.
Worker 23 terminated.
Worker 32 terminated.
Worker 5 terminated.
Worker 20 terminated.
Worker 22 terminated.
ERROR: LoadError: TaskFailedException:
ProcessExitedException(2)

...and 7 more exception(s).

Stacktrace:
 [1] sync_end(::Channel{Any}) at ./task.jl:314
 [2] (::Distributed.var"#159#161"{var"#11#30",UnitRange{Int64}})() at ./task.jl:333
Stacktrace:
 [1] sync_end(::Channel{Any}) at ./task.jl:314
 [2] top-level scope at task.jl:333
 [3] include(::Function, ::Module, ::String) at ./Base.jl:380
 [4] include(::Module, ::String) at ./Base.jl:368
 [5] exec_options(::Base.JLOptions) at ./client.jl:296
 [6] _start() at ./client.jl:506
in expression starting at /home/.../myfile.jl:146
+ date

I have tried using try-catch statements inside the distributed for, but they don’t seem to allow continuation after a failed iteration either

Are you possibly running into the memory limit of your cluster/job allocation? If the Linux OOM killer kills a Julia process, this is what you might see.

Our cluster manager does not require specification of memory, only of cores and the amount of memory available seems large and I thought this was default. I have been contact with our hpc team on something like this in the past and they didn’t see a clear problem. But yes, what you say would make sense so I can check again with them. The memory used in the output seems lessthat 1Gb which seems very little for such a job, Vmem used seems to be slightly below 5Gb

Even if I explicitly ask for sth like 360gb, only about 1 is used and it doesn’t seem to solve it.

To my understanding, all distributed workers have their own memory share. Will Julia automatically distribute all the available RAM evenly between the workers?

There’s no explicit distribution of RAM with Julia or Distributed; memory stays on the node that allocates it unless you explicitly move it around.

I’m not really sure what’s happening here; can you share an MWE so I can try to reproduce this locally?

I think it must be somewhere hardware-dependent, by rewriting the code so that Julia opens with more workers (equal to amount of cores) than I actually iterated over, managed to make it work again this time. But it’s not really a long-term solution.

I think it’s hard to have such thing as a MWE, because the problem intrinsically arises for large simulations. But I can send you some code in pm

Wonder if there was any more progress in understanding this behaviour?

FWIW, I have had very similar experiences recently, albeit with some other packages, but still using Distributed with SharedArrays and @sync @distributed for loops.
The code works perfectly fine on my laptop, but seemingly randomly on the cluster environment workers will terminate (and sometimes the code runs perfectly fine).

Unfortunately, apart from the worker terminated message, I don’t see any other error messages to give any more insight.

Also found it hard to come up with a MWE…

2 Likes

I am also facing this problem. I have a @sync, @distributed loop within my code. I can do it without any errors in my computer, but I’ve moved the code to a linux cluster (google cloud) and now I am getting Worker 2 terminated. ERROR: TaskFailedException errors.

For what’s it worth, I am parallelizing an objective function and calling ForwardDiff and Optim. Giving a MWE is weird because in my computer it works fine, only fails in the server.

I’ve been playing around with my code all day, and I figured out that the most probably cause for these terminations (in my case) where out of memory errors. I was puzzled because the EC2 instance had much more RAM than my own computer.

In the end, what worked was enclosing everything within a function. What I had previously was my package MyPackage, so I would just start with

using Pkg
Pkg.activate(".")
Pkg.instantiate()
using Distributed
addprocs(2,exeflags="--project=$(Base.active_project())")
@everywhere begin
using Pkg
Pkg.activate(dirname(@__FILE__))
Pkg.instantiate()
using (all packages you need)
end
@everywhere using MyPackage

(...bunch of operations such as creating arrays and passing them to functions from MyPackage)

After I enclosed everything after using MyPackage within a file operations.jl and put it within the package, and included the main file, I haven’t had any more worker terminations. Hopefully that helps someone.

Thanks for the update. You mentioned it was a memory issue for you - do you mean you were using too much memory? If so, i’m not sure why wrapping it in a function would help…

For me, my code uses very little memory and is already all wrapped in a function, so i’m not sure if this is the general solution. But i’m glad you fixed your problems anyway!

For me if I open the REPL (julia -p 30 for example) and run my function, it randomly has these terminating workers still (again, only in the cluster environment, never had an issue on my laptop).

Actually, after some more testing, the problem came back. So while wrapping everything in function seems to improve memory consumption (probably due to the globals), it doesn’t avoid it. Only way to avoid this is to assign more memory.

It does seem like a memory error because most of the times I get a Worker 2 terminated. ERROR: TaskFailedException with a OutOfMemory message.

From my basic understanding, if the code in serial uses, say, 2GB to run, when you distribute it to 2 cores, each core will use 2GB so you will need at least 4GB (plus 2GB for the master core?). So if you are parallelizing in 30 cores, you probably need to scale the RAM accordingly.

In my personal computer, when I parallel the code, I have much much less memory than the server, and I often have a bunch of Chrome tabs open, but never get any termination, it just seems that it can manage memory around. Maybe Hard Drive space is also a factor? I don’t know why it behaves differently.

Two explanations here. You could have different ulimits when running on the cluster. You do say though that the cluster has a large memory allocation. Try running ‘ulimit -a’ in your batch job before starting Julia.
Also ask your HPC team if your batch job runs in a cgroup

It could also be that the memory overcommit settings on the cluster are different.

1 Like

Does something similar to ulimit also exists in Windows? What puzzles me the most is that my Windows 10 with 16gb RAM and 4 cores can handle the program with a bunch of Chrome tabs, but the Windows Server with the same configurations in a clean slate crashes.

Windows 10 and macOS by default both should use compressed RAM, so you can allocate more than physical RAM and still not hit an OOM situation.

Interesting. I should double check on this. I would have assumed the total memory usage would remain more or less constant for a job of the same size, regardless of how it is distributed.

Hello everyone, I had problems such as you commented, it seemed that the cluster’s RAM was filling up and terminating processes. Reading the Distributed.jl documentation I found a feature of the pmap function (very interesting to implement parallelism), this feature is called “retry_delays”, this feature allows that even if the process fails you can repeat the process a limited number of times, i.e. if a Worker is Terminated it is restarted and the account is redone. This way the simulation does not stop and it is possible to “live” with this error not effectively resolve it.

Have you tested that retry_delay actually starts a new worker when an existing one dies? I’m pretty sure it doesn’t; it’s only intended to avoid transient errors in the executed function.