Basic of `@everywhere` and `@distributed` macro

Hi everyone,

I am trying to do distributed programming in cluster and I have tried a few tests but I can’t get what I expected. I think I don’t understand the basic of @everywhere and @distributed macro. The following is my testing code:

using Distributed, ClusterManagers

addprocs(40)
println(nworkers())

@everywhere summark = myid()
@everywhere println(summark)

@sync @distributed for i = 1:100
    println(myid(), " , i = ", i, ", summark = ",summark)
end

In particular, I used @everywhere to define summark in each of the processors to be the id of each processors. And @everywhere println(summark) gives what I expected: for examples,

      From worker 38:   38
      From worker 3:    3
      From worker 26:   26
      From worker 19:   19
      From worker 17:   17
      From worker 14:   14
      From worker 5:    5
      From worker 12:   12

However, in the loop @sync @distributed for i = 1:100, I get the following for “summark”:

 From worker 80:   80 , i = 99, summark = 1
      From worker 58:   58 , i = 77, summark = 1
      From worker 66:   66 , i = 85, summark = 1
      From worker 47:   47 , i = 66, summark = 1
      From worker 63:   63 , i = 82, summark = 1
      From worker 74:   74 , i = 93, summark = 1
      From worker 68:   68 , i = 87, summark = 1

I used srun julia -p 40 Distributed_Test2.jl to run.

What I want is to access the “summark” for the corresponding processors within the loop as did in @everywhere println(summark). In particular, my goal is to define some arrays on each processors and update them in the loop (I think SharedArray does not fit my algorithm, because of the complicated data type). But I get stuck in this simple example for a long time. I would appreciate any helps.

Unfortunately, I’m also puzzled by this behaviour. Seems that reading distributed values even changes their value:

julia> using Distributed

julia> addprocs(3)
3-element Vector{Int64}:
 2
 3
 4

julia> @everywhere x = rand()

julia> @everywhere @show x
x = 0.992876486815238
      From worker 4:	x = 0.0032386913858226674
      From worker 2:	x = 0.16438997421798018
      From worker 3:	x = 0.3523385548943164

julia> @sync @distributed for i = 1:10
           @show i, myid(), x
       end
      From worker 2:	(i, myid(), x) = (1, 2, 0.992876486815238)
      From worker 2:	(i, myid(), x) = (2, 2, 0.992876486815238)
      From worker 2:	(i, myid(), x) = (3, 2, 0.992876486815238)
      From worker 2:	(i, myid(), x) = (4, 2, 0.992876486815238)
      From worker 4:	(i, myid(), x) = (8, 4, 0.992876486815238)
      From worker 4:	(i, myid(), x) = (9, 4, 0.992876486815238)
      From worker 4:	(i, myid(), x) = (10, 4, 0.992876486815238)
      From worker 3:	(i, myid(), x) = (5, 3, 0.992876486815238)
      From worker 3:	(i, myid(), x) = (6, 3, 0.992876486815238)
      From worker 3:	(i, myid(), x) = (7, 3, 0.992876486815238)
Task (done) @0x00007f1fbacddb60

julia> @everywhere @show x
x = 0.992876486815238
      From worker 2:	x = 0.992876486815238
      From worker 4:	x = 0.992876486815238
      From worker 3:	x = 0.992876486815238

Is that a bug or feature?

1 Like

Defining constants gives a hint:

julia> @everywhere const y = rand()

julia> @sync @distributed for i = 1:5
           @show i, myid(), y
       end
      From worker 2:	┌ Warning: Cannot transfer global variable y; it already has a value.
      From worker 2:	└ @ Distributed /cache/build/default-amdci4-2/julialang/julia-release-1-dot-8/usr/share/julia/stdlib/v1.8/Distributed/src/clusterserialize.jl:166
      From worker 3:	┌ Warning: Cannot transfer global variable y; it already has a value.
      From worker 3:	└ @ Distributed /cache/build/default-amdci4-2/julialang/julia-release-1-dot-8/usr/share/julia/stdlib/v1.8/Distributed/src/clusterserialize.jl:166
      From worker 4:	┌ Warning: Cannot transfer global variable y; it already has a value.
      From worker 4:	└ @ Distributed /cache/build/default-amdci4-2/julialang/julia-release-1-dot-8/usr/share/julia/stdlib/v1.8/Distributed/src/clusterserialize.jl:166
      From worker 2:	(i, myid(), y) = (1, 2, 0.503898621256702)
      From worker 2:	(i, myid(), y) = (2, 2, 0.503898621256702)
      From worker 4:	(i, myid(), y) = (5, 4, 0.237480921745733)
      From worker 3:	(i, myid(), y) = (3, 3, 0.9453322973599138)
      From worker 3:	(i, myid(), y) = (4, 3, 0.9453322973599138)

Seems that global data is shared between processes and the intended use is to run functions on the different workers against the same data. Would not have figured that from the documentation?!

unfortunately macro based multi threading and multi processing both seems confusing at best, brittle / fragile at times.

1 Like

A better approach would be to use a library which has an explicit notion of remote data, such as Dagger.jl. This example could be better done like:

using Distributed, ClusterManagers

addprocs(40)
println(nworkers())

@everywhere using Dagger

summark = Dagger.@shard myid()
# `summark` is an object which points to the result of `myid()` on all workers in the cluster
map(println, summark)

@everywhere myprint(i, s) = println("$(myid()) , i = $i, summark = $s")
@sync for i = 1:100
    Dagger.@spawn myprint(i, s)
end

This approach is better because:

  • There is no weirdness with global variables (instead there’s just one local variable which points to other “global” variables)
  • Dagger.@shard is explicitly built for this purpose, and you will always get the right value for whichever worker the code runs on
  • You don’t need to express your logic in a for loop; you can use whatever control flow patterns make sense for you

Note that with this example, you’re not guaranteed a perfectly even distribution of prints across the cluster, but generally Dagger will tend to balance the tasks evenly over time.

2 Likes

Thanks for your sugguestions @jpsamaroo, I have tried my algorithm with MPI which seems to be another good alternate for my problem.