Multi-threaded worker processes

My use case is running a rather large application on a cluster of multi-core machines. Using @threads is extremely convenient as one gets shared memory of “all things” (of course one needs to be careful). However, being able to distribute work between the machines is also essential. Is it possible to do both in a single environment? Specifically, is it possible to have a setup where:

  • A main Julia process starts multiple worker processes (as in addprocs)

  • Each worker processes runs on different machines (using the SshClusterManager)

  • Each such worker process uses multiple threads (as many as the number of CPUs the machine has)

Is such a scenario supported?

If this scenario is supported, is it documented anywhere?

Presumably, in such a case, @threads would use all the threads on the current machine. It is less clear what @distributed, @parallel, and pmap would do. Would it depend on which process (main or worker) they are invoked, and which thread (main or additional)?

Ideally I would like the ability to:

  • Run some light-weight tasks on all CPUs - what one would expect from @parallel.

  • Run a reduction across all threads on all machines - what one would expect from @distributed.

  • Run some heavy-weight tasks (which use @threads internally) across all machines. It would probably make the most sense to run only one or at most a few at a time on each machine; prematurely committing to running such tasks on machines would be sub-optimal when their duration varies. Perhaps what one would expect from pmap?

  • Have this all be composable - allow the use of these directives from anywhere, even from within a thread inside a worker process.

Is such a setup possible?

7 Likes

This is a very similar question to what I was going to post. If you don’t mind I’d like to add one more follow up question

  • If hyperthreading is enabled, is it possible to launch n workers for n cpus (i.e. separate memory for each process) and then also use threads within each process since hyperthreading is enabled?

  • or alternative is it better to simply launch 64 processes (32 cores * hyperthreading)?

2 Likes

In my experience you should not create “much” more active processes than CPUs. Otherwise your load average will skyrocket and the OS scheduler really doesn’t handle this well. You might be able to get away with a higher load average using the “nice” command but that would mean your processes would have lower priority than anything else on the machine. If on the other hand you restrict the total number of processes, and you want to mix distribution and multi threading, then you basically revert to my original question.

Related:

https://github.com/JuliaLang/julia/issues/17887

FYI, as Transducers.jl has thread- and Distributed.jl-based parallelisms with a common interface (ref: Thread- and process-based parallelisms in Transducers.jl (+ some news)), it should not be too hard to stitch things together to use thread-based reduction inside Distributed.jl-based reduction. This would then automatically give us a superset of pmap that supports other processings like filtering to be fused with mapping.

5 Likes

The API of Transducers.jl is very nice. However, it doesn’t explicitly address the issue of how to set up a two-level processes/threads combination, if that is even possible. From my experiments so far, addprocs always creates a single threaded worker process. I’m investigating whether some explicit additional keyword arguments can override this. Assuming this works, what would dreduce do? Would it use all threads on all processes or just use one thread per process?

1 Like

It’s certainly possible to combine shared and distributed memory parallelism (i.e. threads and julia workers) in julia. The number of threads on workers processes is determined by the JULIA_NUM_THREADS environment variable. So on Linux I can do launch two workers, each one running 4 threads, as follows:

from the shell run:

export JULIA_NUM_THREADS=4

Then launch julia from that shell session, run addprocs normally and your workers will have 4 threads. The following trivial example demonstrates this:

julia> using Distributed
julia> addprocs(2)
julia> @everywhere begin
         function print_id_2(x)
           pid = Distributed.myid()
           nth = Threads.nthreads()
           Threads.@threads for i in 1:nth
             tid = Threads.threadid()
             println("Hello from thread $tid of $nth on worker $pid. $(x[tid]) is from a vector")
           end
         end  
       end

julia> xv = [(i-1)*4+1:4*i for i in 1:3]
3-element Array{UnitRange{Int64},1}:
 1:4 
 5:8 
 9:12

julia> pmap(print_id_2, xv);
      From worker 3:    Hello from thread 1 of 4 on worker 3. 1 is from a vector
      From worker 3:    Hello from thread 4 of 4 on worker 3. 4 is from a vector
      From worker 2:    Hello from thread 1 of 4 on worker 2. 5 is from a vector
      From worker 3:    Hello from thread 2 of 4 on worker 3. 2 is from a vector
      From worker 3:    Hello from thread 3 of 4 on worker 3. 3 is from a vector
      From worker 2:    Hello from thread 2 of 4 on worker 2. 6 is from a vector
      From worker 2:    Hello from thread 4 of 4 on worker 2. 8 is from a vector
      From worker 2:    Hello from thread 3 of 4 on worker 2. 7 is from a vector
      From worker 3:    Hello from thread 2 of 4 on worker 3. 10 is from a vector
      From worker 3:    Hello from thread 3 of 4 on worker 3. 11 is from a vector
      From worker 3:    Hello from thread 1 of 4 on worker 3. 9 is from a vector
      From worker 3:    Hello from thread 4 of 4 on worker 3. 12 is from a vector

Unfortunately, as far as I can tell, it’s only possible to set the number of julia threads using environment variables. It would be nice if you could, say, pass a keyword argument to addprocs telling it how many threads to use on each new process but that does not seem to be possible.

I’m not sure exactly how you’d do this through a cluster manager like Slurm but presumably there is a way to control the environment that worker processes are launched in? Assuming that there is, you’d just have to make sure that JULIA_NUM_THREADS was set in that environment.

I’ve also never used Transducers.jl but I would guess that if your julia workers were all running multiple threads and you ran a multithreaded routine inside of a distributed computation it would just work.

6 Likes

We use Slurm in our HPC setting with 18 nodes connected by InfiniBand. I think setting export JULIA_NUM_THREADS=4 will copy that variable over to all nodes, and so when I use ClusterManger and addprocs to launch workers in separate nodes, they should all see the environment variable. I will test this out later.

1 Like

Thanks for the reply!

Presumably if I have machines with different CPU counts, then I should set JULIA_NUM_THREADS to some very high number (say, 1000); according to the documentation in such a case the actual number of threads will be the number of CPUs.

It seems pmap will run one task per worker process (not per thread)?
I guess I’ll start doing some experiments on pmap, @parallel and @distributed on such a setup.

Thanks!

I haven’t tested running julia with a very large number of threads. It would be interesting to see if the scheduler is smart enough to only a number that is reasonable for the current machine. Having said that, if you are launching workers on different machines you might be able to set JULIA_NUM_THREADS differently on each machine. Not sure if that’ll work but would be good to try.

Yes, pmap is a distributed memory thing, separate from threading.

If julia had a CLI option equivalent to JULIA_NUM_THREADS then you could set it via exeflags. Unfortunately that’s not the case yet:

https://github.com/JuliaLang/julia/issues/26889

Maybe create an executable (say) julia-mt like this

#!/bin/bash
export JULIA_NUM_THREADS="$(nproc)"
exec julia "$@"

and call addprocs with exename="PATH/TO/julia-mt"?

OK so hooking threaded reduction into distributed reduction in Transducers.jl was super easy:

https://github.com/tkf/Transducers.jl/pull/133/commits/7b2a79c7b2194f742f9d0a2da25d8af6f7bcc0c8

Now I just need to add tests etc.

3 Likes

I opened https://github.com/JuliaLang/julia/issues/34309 to request a command-line flag to set the number of threads, and https://github.com/JuliaLang/julia/issues/34311 to request @distributed_threads and pmap_threads which work across all threads in all worker processes. The current @distributed and pmap only use one thread per worker process.

It is useful to have both options. In terms of Transducers.jl, it would be useful to have reduce, dreduce and a separate dtreduce, or something along these lines. I opened https://github.com/tkf/Transducers.jl/issues/137 for this.

1 Like

I just released a new version of Transducers.jl with a new threads_basesize option to dreduce. “Two-level” parallelism should automatically kick in if you set JULIA_NUM_THREADS appropriately in all workers; i.e., each worker process uses multiple threads. You can still force each worker process to use only one thread by setting threads_basesize = typemax(Int).

1 Like

As for setting up (remote) worker processes with multiple threads, I wrote a helper script which seems to be working:

1 Like

for future reference, the command line option now exists, so someone can pass exeflags=["--threads=auto"]