Several questions on distributed computing from a beginner

Hi, everyone! I’d like to ask several questions that have confused me very much during my learning distributed computing as a complete beginner. So I came here looking for some confirmation, and also hope it can help other beginners get started faster. Let’s take the following singular value decomposition (SVD) problem from the doc as an example.

using Distributed
using LinearAlgebra
addprocs(10)
M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
@time map(svdvals, M);
@time pmap(svdvals, M);

Q1: When we call pmap, the task is distributed to 10 processes to be executed. Then, where and how on earth are they executed on the computer? Is it equal to I opening 10 Julia REPLs and running svdvals(M[i]) (for i in 1:10) separately on them (at the same time)? Does each of the 10 so-called worker processes correspond to one of the number under the field Processes (1/279) in the Task Manager?
image

Q2: How do I check the maximum number of processes available on the computer? What are the resources that limit it? The number of Sockets (Is it the number of CPUs?), Cores or Logical processors? Or the number of Threads? (What do these terms refer to?) If the code is to be run on a cluster, how to set #SBATCH -N 1 and #SBATCH --ntasks-per-node=1 (SLURM)?

Q3: Are there more details or examples about the usage of addprocs(exeflags="--project") and @everywhere? According to my understanding, if a code script main.jl (where there is pmap in it) depends on some project environment, we should add addprocs(exeflags="--project") to the top (below using Distributed) of the main.jl and add @everywhere before every package using (except using Distributed), e.g.,

# main.jl:

using Distributed
addprocs(exeflags="--project")
addprocs(10) # `addprocs` also needs to be called before `@everywhere`.  Right?
@everywhere using PackageA
@everywhere using PackageB
# Other code...

And if we execute it in the terminal, run julia --project main.jl in its directory. But, in the SVD example obove, why don’t we need to add @everywhere before using LinearAlgebra?

Q4: Since addprocs(10) must be called before @everywhere using PackageA (I have tried running the latter first and it will report an error: ERROR: On worker 2: KeyError: key PackageA not found), how can we write a distributed computing function into a module? E.g.,

module MyModule
using Distributed
@everywhere using PackageA: funA

export myfun
function myfun(num_procs)
    addprocs(num_procs)
    # @everywhere using PackageA: funA  -- Should we using PackageA here?
    M = [...]
    @time pmap(funA, M)
end
end

Q5: How is it fair to compare parallel computing time? If we simply compare @time map(svdvals, M) and @time pmap(svdvals, M), we will dismiss the time cost of addprocs(10) and the package loading time of all worker processes.

Thank you for your attention. I cannot expect to receive answers from you on all questions, but I will be extremely grateful for any brief guidance you can provide! I myself will also try to answer some of the questions here when I get clear about them.

1 Like

I’ll first briefly summarize the mental model of Distributed and answer some of your questions in this context.

Distributed.jl allows you to manage and use worker processes (i.e. add/remove workers, distributed tasks). A “process” in this context is really a “system process” i.e. a completely new instance of Julia like opening multiple REPLs.
A nice feature is that Distributed.jl does not assume that the worker processes live on the same physical machine meaning they could also live on remote machines. That explains the name “Distributed”.

Yes your are correct here. Distributing the workload is essentially equivalent to opening 10 REPLs and computing one of the SVDs in each.

Each “worker process” is a “OS process” and thus is one of the 279 total processes.

The number of processes is likely technically limited by the OS (2^22 ~ 4.2e6 for my Ubuntu (found per Stackoverflow)). But since each process requires some resources from the operating system, it is more likely that some other resource (e.g. memory) limits the number of actual possible processes more.

All of the things you refer to are hardware properties and do not limit the number of processes. However in numerical applications you usually want 1 or 2 processes (or threads) per physical core for optimal performance.

What you tell SLURM what resources you want and what resources you use in the end is also independent of each other :wink: SLURM configuration is probably best discussed for a concrete example.

There is a page in Manual: Multi-processing and Distributed Computing · The Julia Language

This code snippet likely does not what you expect it to do. The first call addprocs(exeflags="--project") adds a new worker for every CPU thread (~ number of logical CPU cores) you have. These workers use Pkg.activate on the current working directoy. The second call addprocs(10) add 10 additional workers that don’t activate the local directory.
You likely want to use addprocs(10; exeflags="--project") to create 10 workers that activate the local environment.
The role of @everywhere is simple in concept: Since the workers are separate Julia processes you need to set them up, i.e. import/using the correct packages. Think again about the multiple REPLs: If you import some module in one REPL, the functions from that package are not available in any other REPL. @everywhere simple executes whatever come after it on every worker.

That is happy accident since LinearAlgebra.jl is a standard library and kind of always loaded. It would not work with basically any other library.

Yes you need to create the workers before you perform the setup. Any workers created later on don’t know about any previous setup.

Usually the module should just provide the functionality and you perform the rest of the setup (such as worker creation and initialization) in your script. As such the module should read

# in file mymodule.jl
module MyModule
using Distributed
using PackageA: funA

export myfun
function myfun()
    M = [...]
    @time pmap(funA, M)
end
end

and you do the setup in the script that executes this.

using Distributed
addprocs(10; exeflags="--project")
@everywhere include("mymodule.jl")
@everywhere using MyModule
myfun()

Fairness changes with the application. Usually worker creation and initialization is done only once and then the workers are reused over and over such that over the whole runtime of your script the one-time cost does not matter. Then it makes sense for benchmarks of small snippets to exclude the setup cost.

8 Likes

Shocked by the detailed reply! Thanks a lot! Let me read it carefully!

Thank you very much! Removed more than 98% of my confusion.

1 Like

Do you mean the happy accident is: For a Julia standard library, once it is loaded on worker #1, it will be available on any other workers? The fact is even on a single worker we cannot use svdvals if we don’t using LinearAlgebra, but we can use svdvals everywhere although we don’t use @everywhere.

Hm?

julia> using Distributed

julia> addprocs(2)
2-element Vector{Int64}:
 2
 3

julia> using LinearAlgebra

julia> svd([1 2;3 5])
SVD{Float64, Float64, Matrix{Float64}, Vector{Float64}}
U factor:
2×2 Matrix{Float64}:
 -0.357373  -0.933962
 -0.933962   0.357373
singular values:
2-element Vector{Float64}:
 6.242943383865534
 0.16018085356731407
Vt factor:
2×2 Matrix{Float64}:
 -0.506053  -0.862503
  0.862503  -0.506053

julia> @everywhere svd([1 2; 3 4])
ERROR: On worker 2:
UndefVarError: `svd` not defined
1 Like

What’s the matter?

I mean the following code runs well without adding an @everywhere before the using LinearAlgebra:

using Distributed
using LinearAlgebra
addprocs(10)
M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
@time pmap(svdvals, M);

Does this mean: For a Julia standard library, once it is loaded on worker #1, it will be available on any other workers?

The pmap(svdvals, M) works, but @everywhere svd([1 2; 3 4]) does not. It’s because the function argument to pmap is transferred to the workers as a serialized object, not as the name svdvals. Otherwise pmap could not be used with anonymous function. With @everywhere svd(...) it fails because the namespace of LinearAlgebra is not loaded on the worker, so svd can’t be found.

2 Likes

Uhh, I checked this. It’s actually not true. It might have been years ago when I looked into this, or I remember it wrongly. It’s serialized as a name, with a reference to the module it’s defined in. It does not work with your own defined functions.

1 Like

Thank you all the same, although I’m not able to understand what you’re saying. :grinning: :handshake:

Does your discovery support this?

Do you mean, for the SVD example (addprocs(10)), if we run it on a cluster, we’d better set #SBATCH -N 5 (5 nodes) and #SBATCH --ntasks-per-node=2 (2 tasks per node)?

Is the concept “node” of a cluster similar to the concept “core” or “logical processor” of a PC?

Not really. It is higher in the hierarchy so to speak.
Starting from small to big:

  • A CPU has many physical cores.
  • A computer/node can have multiple CPUs (= sockets)
  • A cluster consists of multiple nodes (= independent computers)
1 Like

Q6: If we simply run

addprocs(10)
pmap(svdvals, M);

will these worker processes be automatically and intelligently distributed and scheduled between any logical processors, cores and CPUs (and nodes on a cluster) that are available?

Q7: If there is only one worker process, pmap will be exactly equivalent to map except for an extra using Distributed. Right?

Q8: Is a pmap allowed to be embedded in another pmap?

A8: The answer may be NOT.

using Distributed
addprocs(3)

# Work
pmap([1:10;]) do i
    map(j -> j, [1:5;])
end

# Don't Work
pmap([1:10;]) do i
    pmap(j -> j, [1:5;])
end

Yes for some definition of “intelligently”.

Yes. I think it even coded like this explicitly.

It seems that recursive pmap somehow deadlock. But it works totally fine if you use @spawnat :any to create the tasks.

# this causes a deadlock
julia> pmap(1:10) do i
           pmap(11:20) do j
               sleep(rand())
               i*j
           end
       end
....
# this works just fine
julia> fetch.(map(1:10) do i
           @spawnat :any fetch.(map(11:20) do j
               @spawnat :any (sleep(rand()); i*j)
           end)
       end)
1 Like

Although I have set #SBATCH -N 1 and #SBATCH --ntasks-per-node=1, the code can still run distributedly, why?

The code is:

using Distributed
addprocs(10)
@everywhere using LinearAlgebra
M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
@time pmap(M) do m
    svdvals(m)
    println("Hello!")
end

whose output is:

      From worker 4:	Hello!
      From worker 6:	Hello!
      From worker 3:	Hello!
      From worker 2:	Hello!
      From worker 5:	Hello!
      From worker 8:	Hello!
      From worker 7:	Hello!
      From worker 10:	Hello!
      From worker 11:	Hello!
      From worker 9:	Hello!
 13.951877 seconds (2.36 M allocations: 160.958 MiB, 0.72% gc time, 14.17% compilation time)

As we can see there were indeed 10 workers although we have set the cluster to be #SBATCH -N 1 and #SBATCH --ntasks-per-node=1. Does this mean the code was executed distributedly?

If I set #SBATCH -N 2 and #SBATCH --ntasks-per-node=10 and run the code again, the result is:

      From worker 4:	Hello!
      From worker 6:	Hello!
      From worker 9:	Hello!
      From worker 11:	Hello!
      From worker 8:	Hello!
      From worker 3:	Hello!
      From worker 2:	Hello!
      From worker 10:	Hello!
      From worker 5:	Hello!
      From worker 7:	Hello!
  4.560007 seconds (2.36 M allocations: 161.357 MiB, 37.19% compilation time)

The time has indeed been reduced. Does this mean that the former distributed computing is an unreal one?

As usual I recommend the following resource to first-time users of Distributed:

1 Like