I have a program that makes use of the parallelism offered by the Distributed
’s pmap()
function. I can run this on a cluster using one node and multiple cores. Now I would like to extend this to using multiple cores.
The architecture of my program is
- I have a pmap loop that takes as inputs few small arrays that it reads from and which writes to a SharedArray that is only written to
- I have a time out functionality that checks that upon timeout terminates all processes and returns the partially filled array
MWE of my setup:
Module MyMod.jl
module MyMod
using Distributed, Dates, SharedArrays
function distr_loop(input_array; start_time = now(), time_limit = Second(10))
output_array = SharedArray(zero(input_array))
timedout = Ref(false)
running_tasks = Dict{Tuple{Int, Int}, Task}()
@async begin
while Dates.now() - start_time < time_limit
sleep(0.1)
end
timedout[] = true
for ((i, j), task) in running_tasks
Base.throwto(task, InterruptException())
end
end
@sync begin
for i in axes(input_array,1)
j_indices = axes(input_array,2)
pmap(j_indices) do j
run_with_timeout(basic_task, timedout, input_array, output_array, i, j)
end
end
end
# Interrupt any remaining unfinished tasks
for ((i, j), task) in running_tasks
if !istaskdone(task)
Base.throwto(task, InterruptException())
end
end
return output_array
end
function basic_task(in,out,i,j)
sleep(1)
out[i,j] = in[i,j]
end
function run_with_timeout(f, timedout, args...;kwargs...)
if !timedout[]
return f(args...;kwargs...)
else
return
end
end
end
julia Script julia_test.jl
:
using Distributed, JLD2, Dates
addprocs(Base.parse(Int64,ENV["SLURM_CPUS_PER_TASK"]); topology=:master_worker,
exeflags="--project=$(Base.active_project())")
@everywhere home_dir = ""
@everywhere include(home_dir * "MyMod.jl")
@everywhere using .MyMod
input_array = randn(5,5)
start = DateTime(ENV["DATE"])
out = Main.MyMod.distr_loop(input_array; start_time = start, time_limit = Second(10))
@save home_dir * "out.jld2" out
SLURM script
#!/bin/bash
#SBATCH --time=48:00:00
#SBATCH --job-name="testrun"
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=48
#SBATCH --mem-per-cpu=4000
#SBATCH --output="test.out"
#SBATCH --error="test.err.out"
#SBATCH --mail-type=ALL
#SBATCH --mail-user=myemail@mydomain.com
module use $HOME/modulefiles
module load julia/1.8.5
module load intel/2019.3.199-GCC-8.3.0-2.32
module load GCC/10.2.0 OpenMPI/4.0.5
export LD_LIBRARY_PATH="julia-1.8.5/lib/julia:$LD_LIBRARY_PATH"
DATE=`date +%Y-%m-%dT%H:%M:%S;`
export DATE
julia julia_test.jl
Now I would like to extend this setup to use more than one node.
What is the simplest / efficient way to do this?