Extend distributed pmap to mutliple nodes using SLURM

I have a program that makes use of the parallelism offered by the Distributed’s pmap() function. I can run this on a cluster using one node and multiple cores. Now I would like to extend this to using multiple cores.

The architecture of my program is

  • I have a pmap loop that takes as inputs few small arrays that it reads from and which writes to a SharedArray that is only written to
  • I have a time out functionality that checks that upon timeout terminates all processes and returns the partially filled array

MWE of my setup:

Module MyMod.jl

module MyMod 
using Distributed, Dates, SharedArrays 

function distr_loop(input_array; start_time = now(), time_limit = Second(10)) 
    
    output_array = SharedArray(zero(input_array))
    
    timedout = Ref(false)
    running_tasks = Dict{Tuple{Int, Int}, Task}()

    @async begin
        while Dates.now() - start_time < time_limit
            sleep(0.1)
        end
        timedout[] = true
        for ((i, j), task) in running_tasks
            Base.throwto(task, InterruptException())
        end
    end
    
    @sync begin
        for i in axes(input_array,1)
            j_indices = axes(input_array,2)
            pmap(j_indices) do j
                run_with_timeout(basic_task, timedout, input_array, output_array, i, j)
            end 
        end
    end

    # Interrupt any remaining unfinished tasks
    for ((i, j), task) in running_tasks
        if !istaskdone(task)
            Base.throwto(task, InterruptException())
        end
    end

    return output_array
end

function basic_task(in,out,i,j)
    sleep(1)
    out[i,j] = in[i,j]
end 

function run_with_timeout(f, timedout, args...;kwargs...)
    if !timedout[]
        return f(args...;kwargs...)
    else
        return 
    end
end

end 

julia Script julia_test.jl:

using Distributed, JLD2, Dates

addprocs(Base.parse(Int64,ENV["SLURM_CPUS_PER_TASK"]); topology=:master_worker, 
	exeflags="--project=$(Base.active_project())")

@everywhere home_dir = ""

@everywhere include(home_dir * "MyMod.jl") 
@everywhere using .MyMod

input_array = randn(5,5) 

start = DateTime(ENV["DATE"]) 
out = Main.MyMod.distr_loop(input_array; start_time = start, time_limit = Second(10)) 

@save home_dir * "out.jld2" out 

SLURM script

#!/bin/bash

#SBATCH --time=48:00:00
#SBATCH --job-name="testrun"
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=48
#SBATCH --mem-per-cpu=4000
#SBATCH --output="test.out"
#SBATCH --error="test.err.out"
#SBATCH --mail-type=ALL
#SBATCH --mail-user=myemail@mydomain.com 


module use $HOME/modulefiles
module load julia/1.8.5
module load intel/2019.3.199-GCC-8.3.0-2.32
module load  GCC/10.2.0  OpenMPI/4.0.5 

export LD_LIBRARY_PATH="julia-1.8.5/lib/julia:$LD_LIBRARY_PATH"
DATE=`date +%Y-%m-%dT%H:%M:%S;`
export DATE

julia julia_test.jl

Now I would like to extend this setup to use more than one node.

What is the simplest / efficient way to do this?

Try using GitHub - kleinhenz/SlurmClusterManager.jl: julia package for running code on slurm clusters, or the older GitHub - JuliaParallel/ClusterManagers.jl.

Thanks for the pointer! So using SlurmClusterManager, it seems I need to set the number of tasks to what I have thus far set as CPUs per task. Hence, I would change the preamble in my SLURM script to

#SBATCH --time=48:00:00
#SBATCH --job-name="testrun"
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=48
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=4000
#SBATCH --output="test.out"
#SBATCH --error="test.err.out"
#SBATCH --mail-type=ALL
#SBATCH --mail-user=myemail@mydomain.com  

## Rest is same 

and in my julia script I simply adjust the environment variable which determines the number of workers:

using Distributed, JLD2, Dates, SlurmClusterManager 

addprocs(Base.parse(Int64,ENV["SLURM_NTASKS_PER_NODE"]); topology=:master_worker, 
	exeflags="--project=$(Base.active_project())")
## Rest is same 

Does that look okay? Trial and error is quite costly on my cluster.