Struggling to figure out how I should use shared arrays on a slurm cluster using remote workers

Hi so I am using a large distributed map function like so:

using ClusterManagers
using Distributed
addprocs_slurm(parse(Int, ENV["SLURM_NTASKS"]))


results = SharedArray{Float64, 2}((total_steps, 10))
bayes_factors = SharedArray{Float64, 3}((4, 4, total_steps))
p = Progress(total_steps)
progress_pmap(1:total_steps, progress=p) do i


            results[i, :] = [real_α, synth_α, auc_β, auc_weighted, auc_naive, auc_no_synth, ll_β, ll_weighted, ll_naive, ll_no_synth]
            bayes_factors[:, :, i] = bf_matrix
            CSV.write("src/creditcard/outputs/results___$(t).csv", create_results_df(results))

And I end up getting the following error:

ERROR: LoadError: On worker 2:   1023
BoundsError: attempt to access 0×0 Array{Float64,2} at index [1]
setindex! at ./array.jl:826 [inlined]73155619093137
setindex! at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/SharedArrays/src/SharedArrays.jl:510 [inlined]
_setindex! at ./abstractarray.jl:1096 [inlined]936
setindex! at ./abstractarray.jl:1073 [inlined]9502197636
macro expansion at ./multidimensional.jl:786 [inlined]
macro expansion at ./cartesian.jl:64 [inlined]
macro expansion at ./multidimensional.jl:781 [inlined]
_unsafe_setindex! at ./multidimensional.jl:77478443744
_setindex! at ./multidimensional.jl:769 [inlined]43744
setindex! at ./abstractarray.jl:1073 [inlined]
#71 at /gpfs/home/dcs/csrxgb/julia_stuff/src/creditcard/run.jl:2042468, 107.8122 ...])
#49 at /home/dcs/csrxgb/.julia/packages/ProgressMeter/g1lse/src/ProgressMeter.jl:795
#104 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/process_messages.jl:294
run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/process_messages.jl:79
macro expansion at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/process_messages.jl:294 [inlined]
#103 at ./task.jl:358
 [1] (::Base.var"#726#728")(::Task) at ./asyncmap.jl:178
 [2] foreach(::Base.var"#726#728", ::Array{Any,1}) at ./abstractarray.jl:1919
 [3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::UnitRange{Int64}) at ./asyncmap.jl:178
 [4] wrap_n_exec_twice(::Channel{Any}, ::Array{Any,1}, ::Distributed.var"#204#207"{WorkerPool}, ::Function, ::UnitRange{Int64}) at ./asyncmap.jl:154
 [5] async_usemap(::Distributed.var"#188#190"{Distributed.var"#188#189#191"{WorkerPool,ProgressMeter.var"#49#52"{var"#71#72"{Int64,Bool,Array{Symbol,1},DataFrame,DataFrame,Int64,Float64,Float64,Float64,Float64,Float64,Array{Tuple{Float64,Float64},1},SharedArray{Float64,2},SharedArray{Float64,3},Int64,Int64,Bool},RemoteChannel{Channel{Bool}}}}}, ::UnitRange{Int64}; ntasks::Function, batch_size::Nothing) at ./asyncmap.jl:103
 [6] #asyncmap#710 at ./asyncmap.jl:81 [inlined]
 [7] pmap(::Function, ::WorkerPool, ::UnitRange{Int64}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Array{Any,1}, retry_check::Nothing) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/pmap.jl:126
 [8] pmap(::Function, ::WorkerPool, ::UnitRange{Int64}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/pmap.jl:101
 [9] pmap(::Function, ::UnitRange{Int64}; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/pmap.jl:156
 [10] pmap(::Function, ::UnitRange{Int64}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/pmap.jl:156
 [11] macro expansion at /home/dcs/csrxgb/.julia/packages/ProgressMeter/g1lse/src/ProgressMeter.jl:794 [inlined]
 [12] macro expansion at ./task.jl:334 [inlined]
 [13] macro expansion at /home/dcs/csrxgb/.julia/packages/ProgressMeter/g1lse/src/ProgressMeter.jl:793 [inlined]
 [14] macro expansion at ./task.jl:334 [inlined]
 [15] progress_map(::Function, ::Vararg{Any,N} where N; mapfun::Function, progress::Progress, channel_bufflen::Int64, kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /home/dcs/csrxgb/.julia/packages/ProgressMeter/g1lse/src/ProgressMeter.jl:786
 [16] #progress_pmap#53 at /home/dcs/csrxgb/.julia/packages/ProgressMeter/g1lse/src/ProgressMeter.jl:811 [inlined]
 [17] main() at /gpfs/home/dcs/csrxgb/julia_stuff/src/creditcard/run.jl:117
 [18] top-level scope at /gpfs/home/dcs/csrxgb/julia_stuff/src/creditcard/run.jl:308
 [19] include(::Module, ::String) at ./Base.jl:377
 [20] exec_options(::Base.JLOptions) at ./client.jl:288
 [21] _start() at ./client.jl:484
in expression starting at /gpfs/home/dcs/csrxgb/julia_stuff/src/creditcard/run.jl:308
┌ Warning: Forcibly interrupting busy workers
│   exception = rmprocs: pids [4, 5, 7, 8, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 27, 28, 29] not terminated after 5.0 seconds.
└ @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/cluster.jl:1219
┌ Warning: rmprocs: process 1 not removed
└ @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/cluster.jl:1015

The array is definitely not 0x0 so I feel it is perhaps not instantiating correctly or something?

Are you running this on the head node? My guess is that the workers aren’t being started on the same node as the main process, and so can’t have shared memory.

It may be easier to just start julia inside an srun or sbatch session and just use regular addprocs.

I submit a job on the login node of the cluster and then resources get salloc’d. I could try that but our admins discourage use of srun for big jobs, plus would this work if I am using multiple nodes? Memory is allocated per core but I am not really familiar enough with the architecture to know what is best here. Ideally I let all of the process to somehow append rows to the same file, is this possible?

How exactly are you launching the job? Are you starting Julia on the login node or on the worker node?

I submit an sbatch file using squeue, within that there is a line to run Julia which is executed on the worker node.

Do you actually have to use a shared array for this? Can’t you just do
result = pmap( ...) and then unpack the results and put them into the relevant arrays?
Or does the shared array has some advantage here?

Just to reiterate the question from above- are you sure all the tasks are allocated on the same node? The docs for SharedArray state:

Construct a SharedArray of a bits type T and size dims across the processes specified by pids - all of which have to be on the same host.

Yeah I feel like the workers must be remote, the nice thing about a shared array is that I can safely write it to a file at the end of each iter without overwriting previous progress and without the risk of losing everything if there is an error / something hangs. If the tasks are all on different remote nodes, I am wondering if it might be possible to just append the outcome of each iteration to a file directly?

It’s not necessarily true that the tasks should be on different nodes. In some (most?) clusters one actually gets allocated a full node with multiple cores, but this depends on your cluster and sbatch script. I guess you could test this by having each worker print the hostname.

If the tasks are all on different remote nodes, I am wondering if it might be possible to just append the outcome of each iteration to a file directly?

How about saving the results from each pmap step to a different file and then you just unite the files at the end (or use this in addition to result = pmap(...) where the files are just used for checkpointing in case the computation crashes)

Maybe some other people have more sophisticated solutions in mind.

1 Like

One of the SLURM options (I think it’s -n or -N) allows you to specify the number of nodes. If you set it to 1, it guarantees that all cores are in the same node, though it gets weird if you set the number of cores to be greater than the number of cores per node. I think the nodes setting silently overrides the cores setting (so if you ask for 30 cores on 1 node, and the cluster only has 8 cores per node, you’ll get 8 cores)… I think

Yeah I use the nodes and tasks-per-nodes macros in my sbatch, as I plan to eventually run this across multiple nodes of the cluster. I like your idea of unifying files at the end, I managed to get something working appending onto a single large file if I give it an absolute path but think yours is probably cleaner.

I managed to get something working appending onto a single large file

Can’t it lead to problems if two workers are trying to write to the file at the same time? Or is there a safe way to do that?

Yeah I haven’t encountered that yet but am wary of it so think I will switch to many files and unifying at the end in case of a crash as you suggest