How to query Slurm Array Task ID in Julia?

I want to parallel 100 Monte Carlo study on 100 compute nodes. Within each node, it should execute the Julia script attached with a unique random seed (so that it will generate a unique fake dataset). Also, within each node, the Julia script should parallel on all available threads within that node but not across nodes.

I learned that I should utilize job arrays. With Slurm, there is an environmental variable SLURM_ARRAY_TASK_ID which I think should return the current number of Monte Carlo replication. I wonder how can I query this value inside Julia. Something like:

num_replication = SLURM_ARRAY_TASK_ID
Random.seed!(num_replication)

Thank you.

1 Like

Environment variables are stored in the ENV dict as strings:

num_replication = Base.parse(Int, ENV["SLURM_ARRAY_TASK_ID"])
5 Likes

Thank you very much. It works.

I also use Slurm, but instead of creating a batch script and using sbatch, srun, I use ClusterManagers. My workflow is like the following:

using ClusterManagers
addprocs(SlurmManager(512), N=16, topology=:master_worker, exeflags="--project=.")
nsims = 500
res = pmap(1:nsims) do x                 
      run_simulation(x)
end   

where run_simulation is a function that runs a full independent simulation. The function accepts the “simulation number x” which I use to set my seed.

function run_simulation(x)
    Random.seed!(x) 
    ... do work ...
end

The advantage of this is that when the pmap returns, it saves the data from each simulation in the variable res. I can process the data of my Monte Carlo simulations right away and save the data. It also has the advantage that the function needs to be compiled once (rather than everytime a julia process is launched and runs your script).

2 Likes

Hi @affans, thank you for your suggestion. I am very interested in learning this. I guess there is something special with my code. I need to compute the objective function in parallel. So I have the following parallel code which should utilize all threads within a compute node. Therefore, I use job array, and send the Julia script to say 50 compute nodes for 50 Monte Carlo replications. Then, within each compute node, my code utilizes the available threads within that node.

My understanding of SlurmManager(512), N=16 is that it will add 512 cores nested in 16 nodes(?). Then, you parallel 500 replications on 512 cores(?). My concern is that, for my code, it will not be able to parallel on all threads or there will be a conflict.

using Distributed
addprocs(Sys.CPU_THREADS)
@everywhere using JuMP, Gurobi, SparseArrays

@everywhere function fun_sim_optimal_assignment(w_sim::Vector{Float64}, N::Int64)
    w_sim = reshape(w_sim, N, N)
    model_sim = Model(optimizer_with_attributes(()->Gurobi.Optimizer(GRB_ENV)))
    @variable(model_sim, H_sim_temp[1:N, 1:N] >= 0)
    @constraint(model_sim, [i = 1:N], sum(H_sim_temp[i, j] for j = 1:N) == 1)
    @constraint(model_sim, [j = 1:N], sum(H_sim_temp[i, j] for i = 1:N) == 1)
    @objective(model_sim, Max, sum(w_sim[i, j] * H_sim_temp[i, j] for i = 1:N, j = 1:N))
    JuMP.optimize!(model_sim)
    if termination_status(model_sim) == MOI.OPTIMAL
        H_market_sim = value.(H_sim_temp)
        return findall(!iszero, sparse(H_market_sim))
    else
        error("The model was not solved correctly.")
    end
end

function fun_H_sim_parallel(fun_sim_optimal_assignment::Function, w_sim_column::Matrix{Vector{Float64}}, num_simulation::Int64, N::Int64, T::Int64)
    H_sim_temp = Matrix{Vector{CartesianIndex{2}}}(undef, T, num_simulation)
    np = nprocs()
    let w_sim_column_temp = w_sim_column
        for t = 1:T
            i = 1
            nextidx() = (idx=i; i+=1; idx)
            @sync begin
                for p = 1:np
                    if p != myid() || np == 1
                        @async begin
                            while true
                                idx = nextidx()
                                if idx > num_simulation
                                    break
                                end
                                H_sim_temp[t,idx] = remotecall_fetch(fun_sim_optimal_assignment, p, w_sim_column_temp[t,idx], N)
                            end
                        end
                    end
                end
            end
        end
    end
    return H_sim_temp
end

You are exactly right. I have 32 independent processors on 16 nodes (so 16*32=512 processors) that can each run my “simulation function” in parallel (given that there is sufficient memory in each node).

In our cluster, hyperthreading is actually enabled so our cluster manager reports 64 processors per node rather than 32, but I try to avoid using that because it’s not “true” parallelism. I imagine, if I can get my function to use two threads (per processor), i can speed it up.

I don’t particularly think that your idea of using addprocs(Sys.CPU_THREADS) is ideal or even correct… it is launching more workers than actual number of independent processors and might actually be slowing things down.

In general this is a question about mixing threading and distributed computing. As far as I know, there isn’t even a equivalent addthreads() function and if you want to use threading, you have to launch Julia with the correct command line argument or set the environment variable JULIA_NUM_THREADS=4 .

I know @tkf has implemented this sort of multi-parallelism in his packages (particularly Transducers.jl) so he may be able to provide some insight here.

I imagine the workflow has to go something like this:

# set JULIA_NUM_THREADS=4 

addprocs(SlurmManager(16) , kwargs...) # launch 16 independent workers over 16 nodes using Slurm 

# QUESTION: does each worker properly utilize the JULIA_NUM_THREADS env variable??

results = pmap(1:nsims) do x 
    main(x) # 16 copies of main launched on 16 processors/nodes
end

So again the main question (like I wrote in the example above) is that does addprocs which launches Julia workers, i.e. with command line argument julia --worker, also take into account the number of threads. If it does, then you have x number of independent workers on independent processors with each worker having access to y threads.

FYI, there is a PR to add -t/--threads option to julia so that you can do something like addprocs(..., exeflags=`--threads=32`) for using 32 threads per worker process. IIUC it also would propagate --threads flag to workers so you don’t even need to specify --threads via addprocs.

https://github.com/JuliaLang/julia/pull/35108

Until then maybe you can use a shim script like GitHub - tkf/julia-t: Run `julia` with `JULIA_NUM_THREADS=<number of physical cores>`. (requires python and lscpu to detect number of physical cores) to launch workers with multiple threads.

Hi @affans, thank you so much for your detailed explanation. I will look into the sample workflow carefully. I am really new to both coding and HPC. Please bear with me if the following paragraph does not make sense.

Regarding the addprocs(Sys.CPU_THREADS), I need to clarify for other people reading this post. Here are two level of parallelism:

  1. Parallel 100 Monte Carlo replications.
  2. Within each replication, parallel the evaluation of the objective function.

Now, what I refer to is the second parallelism. You can think of the objective function in my code as the sum of many independent small functions. So I can speed it up by paralleling the calculation of small functions. The corresponding part of code for parallelism is in the above post. I learned it from a friend and also I found helpful documentation at here in the section of Dynamic Scheduling. I tested my code both locally and on HPC cluster. In my Slurm job script below, I require 16 cores and 2 threads per core (because the cluster supports hyper-threading). So, in total, I have 32 threads which is also the maximum number of threads allowed by my cluster.

#SBATCH --ntasks=1
#SBATCH --cpus-per-task=16
#SBATCH --threads-per-core=2

I set the small function to be a naive function which waits for 5 seconds and then moves on. I set the number of small functions to be 32. Then, @time reports 5 seconds. If I set the number of small functions to be 33, i.e., one more than the number of threads. Then, @time reports 10 seconds. Hence, I believe my code should work.

Now, let me talk about the first level of parallelism. If there is no second level of parallelism, i.e., the evaluation of the objective function is single-threaded. I believe you can parallel the Monte Carlo replications as what @affans suggested in his/her first post. However, for my particular workflow, the evaluation of the objective function is very computational burdensome. Therefore, I have to have the second level of parallelism. What I am doing now is to add the following line to my Slurm job script:

#SBATCH --array=1-100

This (I believe) should send each of the replication to a unique compute node, then within that compute node, the evaluation of the objective function parallels on all 32 threads. You can query the task ID in Julia as suggested by @stillyslalom and assign unique parameters to each replication according to the task ID.

I believe @affans suggests another way to do this by using the package ClusterManagers. I am still learning how to do it. As in the sample workflow of @affans’s second post:

addprocs(SlurmManager(100) , kwargs...)

I am not sure if this will add 100 cores across many nodes, or it will add 100 compute nodes. Also, in this case, I do not know how to tell Julia to parallel on all 32 threads at each node.

Unless the script is setting the JULIA_NUM_THREADS, unfortunately, your program wont be using threads. You’ll have to set this in the batch script so that the workers Julia launches (i.e. via addprocs) have threading enabled.

In your main objective function code, are you even using the Threading library? i.e. are you doing using Threads? If not, you are not using threads regardless. See what Threads.nthreads() returns in your code…

Ideally the way it would work is the following (here I am assuming usage of ClusterManagers):

  1. First, set JULIA_NUM_THREADS = xx. Since you’ll be launching Julia worker processes on each processor on each node, these workers need to see the correct env variable. I think this should be set to two for hyperthreading since each processor has two threads. Maybe @tkf can clarify this.

  2. Use ClusterManagers to launch say 100 processors on 10 nodes, i.e. addprocs(SlurmManager(100), n = 16). This will launch 100 independent Julia processes (i.e.julia --worker) .

  3. Now you have 100 julia workers, each of which sees the JULIA_NUM_THREADS env variable. You can test this by the following code:

pmap(1:100) do x 
     run_function() ## run function is sent to the 100 workers to run in parallel
end

function run_function()
   println(Threads.nthreads())  ## print the number of threads this function "sees".
end
  1. Once verified your workers are launched correctly, read the documentation on how to use threading. You may have to modify your code… you may not even need to use threading…

A really good blog post is here: Announcing composable multi-threaded parallelism in Julia

Again, remember: Distributed Programming in Julia (via Distributed) is completely different than Threading via Threads.

I will try what you have suggested, thanks.

It may sound weird. But addprocs(Sys.CPU_THREADS) does work. I can give you a MWE.

using Distributed

addprocs(Sys.CPU_THREADS) # add all available threads, -1 if run locally.

# Add packages used in parallel function "fun_sim_optimal_assignment" to every threads.
@everywhere using JuMP, Gurobi, SparseArrays, Random

# Set the Gurobi environment on all threads. This will reduce meomory consumption.
@everywhere const GRB_ENV = Gurobi.Env()
@everywhere setparams!(GRB_ENV, Presolve=0, Method=1, OutputFlag=0)

# Function to solve a simple linear programming problem, i.e., finding the optimal assignment in an N by N matching game.
@everywhere function fun_sim_optimal_assignment(w_sim::Vector{Float64}, N::Int64)
    sleep(5)
    # println("host is $(gethostname())") # ID of the compute node
    # println("pid is $(getpid())") # ID of the corresponding thread
    w_sim = reshape(w_sim, N, N)
    model_sim = Model(optimizer_with_attributes(()->Gurobi.Optimizer(GRB_ENV)))
    @variable(model_sim, H_sim_temp[1:N, 1:N] >= 0)
    @constraint(model_sim, [i = 1:N], sum(H_sim_temp[i, j] for j = 1:N) == 1)
    @constraint(model_sim, [j = 1:N], sum(H_sim_temp[i, j] for i = 1:N) == 1)
    @objective(model_sim, Max, sum(w_sim[i, j] * H_sim_temp[i, j] for i = 1:N, j = 1:N))
    JuMP.optimize!(model_sim)
    if termination_status(model_sim) == MOI.OPTIMAL
        H_market_sim = value.(H_sim_temp)
        return findall(!iszero, sparse(H_market_sim))
    else
        error("The model was not solved correctly.")
    end
end

# Parallel for loop, which should utilize all threads (including hyper threading).
# It computes the linear programming problem on different threads simultaneously.
function fun_H_sim_parallel(fun_sim_optimal_assignment::Function, num_simulation::Int64, N::Int64, T::Int64)
    Random.seed!(1024)
    H_sim_temp = Matrix{Vector{CartesianIndex{2}}}(undef, T, num_simulation)
    np = nprocs()
    for t = 1:T
        i = 1
        nextidx() = (idx=i; i+=1; idx)
        @sync begin
            for p = 1:np
                if p != myid() || np == 1
                    @async begin
                        while true
                            idx = nextidx()
                            if idx > num_simulation
                                break
                            end
                            w_sim = randn(N^2)
                            H_sim_temp[t,idx] = remotecall_fetch(fun_sim_optimal_assignment, p, w_sim, N)
                        end
                    end
                end
            end
        end
    end
    return H_sim_temp
end

# My HPC cluster has 32 threads, if run locally, replace 32 with number of threads minus 1.
# Here, 32 is the number of linear programming problems you want to solve.
# The second @time should report 5 seconds.
@time fun_H_sim_parallel(fun_sim_optimal_assignment, 32, 5, 1)
@time fun_H_sim_parallel(fun_sim_optimal_assignment, 32, 5, 1)

You can check this on your own computer. For example, my laptop has 2 cores and 4 threads, so I write addprocs(Sys.CPU_THREADS-1) and @time fun_H_sim_parallel(fun_sim_optimal_assignment, 3, 5, 1) instead. If you do not have Gurobi, I believe you can comment all lines below sleep(5) in function fun_sim_optimal_assignment.

I believe I am using Distributed rather than Threads.

Right, I am not saying addprocs(Sys.CPU_THREADS) dosn’t work… I am saying that you are not making use of the newly developed, true threading library. You are not utilizing the hyperthread feature of your CPU. For this you need to have the envionrment variable setup and call using Threads, and write code that is thread safe (remember, when using threads the memory is shared).

By running addprocs(Sys.CPU_THREADS) you are simply launching Sys.CPU_THREADS workers. Each worker is inherently single threaded. There is no memory shared between them, but you are indeed running “true” parallelism. I am also not used to using primitives like @sync and @async, but I don’t think these utilize threading either.

So again, point is… you are launching x amount of independent workers… each worker is separate… this is through Julia’s Distributed library. it is possible to use threads within each worker by also utilizing Julia’s Threads library.

To really wrap your head around it, simply do addprocs(4) on your local machine and play around with functions from both Distributed and Threads.

Hi, thanks, I think I know what you mean. To be honest, I am quite confused about it. I will read the documentation. I really appreciate your help.

See discussion here: Combining distributed computing / multithreading

See a test post here: https://www.reddit.com/r/Julia/comments/cynbbb/question_multiprocessing_vs_multithreading/

See benchmarks here: https://phrb.github.io/2019-02-16-intro_parallel_julia/

If you google Distributed vs Threading in Julia, you’ll get a better idea.

I’ve never tried SlurmManager. So I don’t know how it interacts with SLURM_CPUS_PER_TASK. But, if it uses one process per Slurm task, I think JULIA_NUM_THREADS=$SLURM_CPUS_PER_TASK would work. I’m not sure if hyperthreading is useful but I think you can use it by multiplying SLURM_CPUS_PER_TASK by SLURM_THREADS_PER_CORE (not sure if this environment variable exists).

Aren’t you overwriting 100 by 16? Looking at

it looks like 100 is passed to srun via -n. So the keyword argument n would overwrite it? Maybe what you want is addprocs(SlurmManager(100), cpus_per_task = 16) or something? Or perhaps addprocs(SlurmManager(100), cpus_per_task = 16, export = "ALL,JULIA_NUM_THREADS=16")?

Actually, the correct way to do this is as follows. (Please ignore my previous post.)
The Slurm should be

#!/bin/bash
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=20
#SBATCH --threads-per-core=1
#SBATCH --mem-per-cpu=4G
srun julia -p $(( 2 * $SLURM_CPUS_PER_TASK )) test.jl

Note that you should set #SBATCH --threads-per-core=1 to get 20 physical cores. If you use the default setting of threads-per-core which is 2, then you only get 20/2=10 physical cores. The last line is to open Julia on 2*20=40 threads (including hyper-threading). So, if threads-per-core = 2, then you will over-scribe the cores/threads you want.

Then, in the test.jl script, you do not need to do something like this

addprocs(40)

since it is already added while starting Julia.

Are you sure you want to do distributed computing though? It seems to be your solution may be better off using the newer Threads library. Parallelism via threading is a shared memory model, where as parallelism via distributed is basically launching n indenpendent Julia processes connected by some sort of message passing. The general advice is that if you have long running function myfun, that could be run in an embarrassingly parallel format, then you use distributed. If you have a function that is relatively fast or the parallelism happens inside a for loop, it’s better to use threading.

Note there is a combination of the two as well. You can request n number of CPUs for distributed parallelism (i.e. n independent Julia processes) and within each process, make sure of Threads shared memory parallelism.