Combining distributed computing / multithreading

Hello everyone,

I’m trying to use multithreading in Julia 1.0 over an HPC cluster.
This HPC cluster own differents nodes (separate CPU units) and each node has several cores.

I want to run Julia parallel code over this type of architecture by adding a new worker for each node and then do multithreading over the cores of each node.
I’ve already done some code where I add a worker per core. It works but if I request to the cluster 10 nodes of 24 cores I will create 240 processes and this will take time (for example by loading heavy packages, synchronize 240 processes, distribute data over 240 processes…).

So I want to create 10 processes over the nodes and do multithreading over these 10 processes. In my mind I will gain time (maybe it’s not even true and my reasoning is bad ?). But I have difficulties to set the number of threads of workers.

The HPC cluster is managed by PBS. Let’s see an example , i have some pbs script that I can submit

#!/bin/bash

#PBS [options] ..

module load julia/1.0.1
julia --startup-file=yes -L env_threads.jl --machine-file=$PBS_NODEFILE test_parallel.jl

A small file called env_threads.jl set the number of threads in the environment and is loaded by each worker :

ENV["JULIA_NUM_THREADS"] = 24

And the main julia file test_parallel.jl :


using Distributed
@show ENV["JULIA_NUM_THREADS"]
@show Threads.nthreads()

@everywhere NCORES = 24

@everywhere function task()
	@show Threads.nthreads()
	@show ENV["JULIA_NUM_THREADS"]
	u = zeros(NCORES)
	Threads.@threads for i = 1:NCORES
		u[i] = Threads.threadid()
	end
	return u
end

a = [zeros(NCORES) for k = 1:nworkers()]
@sync for id_w in workers()
	i = id_w - workers()[1] + 1
	@async a[i] = remotecall_fetch(() -> task(), id_w)
end

for i = 1:nworkers()
	@show a[i]
end

And the output of this script on 3 nodes of 24 cores is :

Your julia package directory : /workdir/bentrioum/.julia
ENV[“JULIA_NUM_THREADS”] = “24”
Threads.nthreads() = 1
From worker 2: Threads.nthreads() = 1
From worker 2: ENV[“JULIA_NUM_THREADS”] = “24”
From worker 4: Threads.nthreads() = 1
From worker 4: ENV[“JULIA_NUM_THREADS”] = “24”
From worker 3: Threads.nthreads() = 1
From worker 3: ENV[“JULIA_NUM_THREADS”] = “24”
a[i] = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
a[i] = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
a[i] = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]

So we can see that the environment variable JULIA_NUM_THREADS is well set but the number of active threads is 1.

What can I do to avoid that ? Is there anything in Julia that can fix this issue or I have to do something at the HPC cluster level ? (like finding a way to set the environment variable in the nodes before julia starts ?).

Thank you for your time :slight_smile:

1 Like

Hello @moudbis Do you necessarily have to use threads on the remote worker machines?
Have a look at Distributed Computing · The Julia Language

Machines is a vector of machine specifications. Workers are started for each specification.

A machine specification is either a string machine_spec or a tuple - (machine_spec, count) .

count is the number of workers to be launched on the specified host. If specified as :auto it will launch as many workers as the number of CPU threads on the specific host.

Whoops. My bad - I see you have already done this.

Could be that the threads package decides to run the loop in a single thread because the range is so small.

Thanks for your response.

I don’t think it’s the reason because before the loop I print the values of Threads.nthreads() and it is equal to 1. Maybe it’s due to the fact that I run julia before setting the values of JULIA_NUM_THREADS in the nodes.

Hey thanks for your response.

Yes I’ve already done this and it works (it creates properly 3*24 workers) and I checked that all of the nodes are used via ssh connection).

But I want to compare the cost computation of that with the other methodology I proposed in my post : create only one worker per node and do multithreading in each worker. My intuition is that it will speed up a bit my code because I only have to create 10 workers and load the packages in these 10 workers, distribute data only in these 10 workers and so on… Do you think I’m right ?

My experience was that distributing a significant amount of data across multiple workers incurred a non-negligible overhead. Threads are in my opinion better at managing the movement of data.

I have a similar problem. I have compute nodes with 32 cores each. However, the CPUs are hyperthread enabled and I am wondering how to leverage this power. Infact the “number of CPUs” in our cluster web software shows 64 cpus/node (but I know there are 32 physical cores).

I usually run large simulations per core. Sometimes the simulation has input/output. I would like to use the corresponding “thread” of that CPU to do input/ouput. Sometimes I would like to have a real-time analysis as the simulations are continuing. I am thinking maybe threads can be used for this purpose as well.

https://github.com/JuliaLang/julia/issues/35037

4 Likes