How to get persistence on distributed workers

The following example attempts to define A on several workers, which then update their local copies.

How can I make this work?

using Distributed,Test
addprocs(2)
@everywhere function incA()
    A += myid() # this is the local copy of A
    println("worker:",myid(),", A=$A")
    A
end

@everywhere function initA()
    id = myid()
    A = fill(1.0,3) .+ id
    println("id:$id, initial A=$A")
end

@testset begin
    @everywhere initA()
    @test @fetchfrom 2 A .== fill(3.0,3)
    
    for i in workers()
        remotecall_wait(incA,i)
    end
    @test @fetchfrom 2 A .== fill(5.0,3)        
end

I guess both tests fail because of this:

  On worker 2:
  UndefVarError: A not defined
  #5 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:149
  #106 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:294
  run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:79
  macro expansion at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:294 [inlined]
  #105 at ./task.jl:356

I think you are having scoping issues. When you execute @everywhere initA(), the function is executed on all workers: it defines A within the local scope and when it returns, A is lost. The line @test @fetchfrom 2 A .== fill(3.0,3) fails because it’s looking for a variable A at the global scope on the second worker, which you havn’t defined.

What you need to do is define A at the global scope for each worker and then modify the functions to mutate that global variable. Something like @everwhere A = zeros(Float64, 3) which will define A on both worker 2 and 3, which can then be mutated by functions run on those workers.

Something like

@everywhere A = zeros(Float64, 3) # define A as a global variable for all workers
@everywhere function initA()
           id = myid()
           global A # tell the function to use the global variable instead of creating a new one
           A = fill(1.0,3) .+ id
           println("id:$id, initial A=$A")
end
remotecall_wait(initA,2)
remotecall_wait(initA,3) # or use a loop 

But you should really think about/avoid using global variables for performance purposes. Very easy to introduce bugs as well. If you want parallelism with shared memory, you should look at Threads rather than Distributed.

3 Likes

thanks, @affans. I opted for Distributed for a use case that involves simultaneous updates to a global on multiple machines (which are intentionally unsynchronized) but I’ll definitely keep in mind the performance cost of this.

hi @affans, the following uses your suggestion of declaring a global, then calling a function on it from a worker, but seems to have an issue. Please let me know if anything jumps out at you:

using Distributed,Test
addprocs(3)
@everywhere module testmoda
using Distributed
function getmyA()
    global A
    A[myid()]
end
export getmyA
end

@everywhere A = [1:nworkers();] # define A as a global variable for all workers
@test 2 == @fetchfrom 2 getmyA()

julia> @fetchfrom 2 getmyA()
ERROR: On worker 2:
UndefVarError: getmyA not defined
#11 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:149
#106 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:294
run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:79
macro expansion at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:294 [inlined]
#105 at ./task.jl:356
Stacktrace:
 [1] #remotecall_fetch#143 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:394 [inlined]
 [2] remotecall_fetch(::Function, ::Distributed.Worker) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:386
 [3] remotecall_fetch(::Function, ::Int64; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
 [4] remotecall_fetch(::Function, ::Int64) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
 [5] top-level scope at REPL[22]:1

Yes, there are a few problems here. First, a little note on workers: It’s easier to think of the workers as independent Julia processes, infact each worker is simply a Julia instance with the --worker command line argument (so addprocs literally launches n julia --worker processes and connects them together). Point is that each worker is a fresh instance with no variables/modules/functions defined (except the standard library ones). Note that if you launch a fresh Julia instance, and just type in runfunction() or A, you will get not defined error. You are seeing similar problems here.

Lets go through your code.

You’ve correctly defined the module testmoda in all workers. But how exactly do you use a module in Julia? By the using statement! so indeed, running

julia> @everywhere using .testmoda

will bring all exported functions of that module, including getmyA in scope. Mind the . notation here since it’s a locally defined module. Now that your function is imported, we can run

julia> @fetchfrom 2 getmyA()
ERROR: On worker 2:
UndefVarError: A not defined

ta-da! it works (although it does produce another error). Alternatively, if you don’t run using testmoda, you can also just directly scope into the module… i.e. @fetchfrom 2 testmoda.getmyA(), but it’s just easier to do a using statement if you have a lot of exported functions.

The function runs, but now produces a second UndefVarError. It’s complaining that A isn’t defined, even though you have run @everywhere A = [1:nworkers();]. This expression indeed defines a global variable in all workers, however it defines it under the default module Main. You can see this by executing

julia> @fetchfrom 2 (@which A)
Main

which asks where A is defined on worker 2 (and you see its defined in Main). So when running getmyA() it’s looking for a variable A inside the module testmoda, which dosn’t exist. If you really want to define a global variable inside the module testmoda, you have to use eval. So something like

julia> @everywhere @eval testmoda (A = [9, 10, 11])

which runs the @eval testmoda (A = [9, 10, 11]) command on all workers. The @eval macro itself requires two arguments. The first testmoda says in which module to execute the statement, and the second argument (A = [9, 10, 11]) is the statement to be executed.

Now A is defined in testmoda (infact, if you havn’t reset your session, its also defined in Main). But everything should work:

julia> @everywhere @eval testmoda (A = [9, 10, 11])

julia> @fetchfrom 2 getmyA()
10

Ofcourse, this is all for learning purposes. Using global variables like this and the use of @eval is not really meant for production level code unless you absolutely know what you are doing. If you just want to define a global variable, just do it in the module directly. i.e.

using Distributed,Test
addprocs(3)
@everywhere module testmoda
    using Distributed
    A = zeros(Float64, 3)  ## define global variable in `testmoda`
    function getmyA()
        global A
        A[myid()]
    end
    export getmyA
end
2 Likes

Thanks for that fantastic explanation!

I’ve updated the example below and the @testset "broadcasting" passes.

I don’t want to get too off topic but wondering if you have any insight on @testset "mutability"?

The second test causes error “cannot assign variables in other modules”, which implies that ismutable() called from Main in the preceding line should also fail, yet it passes.

using Distributed,Test
addprocs(3)
@everywhere module testmoda
using Distributed
A = zeros(Float64, nworkers())  ## define global variable in `testmoda`
function getmyA()
    # global A
    A[myid()]
end
export getmyA
end

@everywhere using .testmoda

@testset "broadcasting" begin
    @everywhere testmoda.A .= [1:nworkers();] # define A as a global variable for all workers
    @test 2 == @fetchfrom 2 getmyA()
end

@testset "mutability" begin
    @test ismutable(testmoda.A)
    @everywhere testmoda.A = [1:nworkers();] # define A as a global variable for all workers
    @test 2 == @fetchfrom 2 getmyA()
end
Test output

julia> @testset "broadcasting" begin
           @everywhere testmoda.A .= [1:nworkers();] # define A as a global variable for all workers
           @test 2 == @fetchfrom 2 getmyA()
       end
Test Summary: | Pass  Total
broadcasting  |    1      1
Test.DefaultTestSet("broadcasting", Any[], 1, false)

julia> @testset "mutability" begin
           @test ismutable(testmoda.A)
           @everywhere testmoda.A = [1:nworkers();] # define A as a global variable for all workers
           @test 2 == @fetchfrom 2 getmyA()
       end
mutability: Error During Test at REPL[6]:1
  Got exception outside of a @test
  On worker 2:
  cannot assign variables in other modules
  setproperty! at ./Base.jl:27
  top-level scope at none:1
  eval at ./boot.jl:331
  #103 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:290
  run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:79
  run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:88
  #96 at ./task.jl:356
  
  ...and 3 more exception(s).
  
  Stacktrace:
   [1] sync_end(::Channel{Any}) at ./task.jl:314
   [2] macro expansion at ./task.jl:333 [inlined]
   [3] remotecall_eval(::Module, ::Array{Int64,1}, ::Expr) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:218
   [4] top-level scope at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:202
   [5] top-level scope at REPL[6]:3
   [6] top-level scope at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Test/src/Test.jl:1115
   [7] top-level scope at REPL[6]:2
   [8] include_string(::Function, ::Module, ::String, ::String) at ./loading.jl:1091
   [9] repleval(::Module, ::String, ::String) at /home/au/.vscode/extensions/julialang.language-julia-1.0.10/scripts/packages/VSCodeServer/src/repl.jl:105
   [10] (::VSCodeServer.var"#43#45"{Module,String,REPL.LineEditREPL,REPL.LineEdit.Prompt})() at /home/au/.vscode/extensions/julialang.language-julia-1.0.10/scripts/packages/VSCodeServer/src/repl.jl:84
   [11] with_logstate(::Function, ::Any) at ./logging.jl:408
   [12] with_logger at ./logging.jl:514 [inlined]
   [13] (::VSCodeServer.var"#42#44"{Module,String,REPL.LineEditREPL,REPL.LineEdit.Prompt})() at /home/au/.vscode/extensions/julialang.language-julia-1.0.10/scripts/packages/VSCodeServer/src/repl.jl:83
   [14] #invokelatest#1 at ./essentials.jl:710 [inlined]
   [15] invokelatest(::Any) at ./essentials.jl:709
   [16] macro expansion at /home/au/.vscode/extensions/julialang.language-julia-1.0.10/scripts/packages/VSCodeServer/src/eval.jl:27 [inlined]
   [17] (::VSCodeServer.var"#56#57")() at ./task.jl:356
  
Test Summary: | Pass  Error  Total
mutability    |    1      1      2
ERROR: Some tests did not pass: 1 passed, 0 failed, 1 errored, 0 broken.

It’s failing because you are trying to assign to A a brand new RHS. Typing in A = ... dosn’t “mutate” A. It tries to redefine the variable A to whatever the RHS is, which is not allowed from another module. (The old binding is actually lost and likely garbage collected). See this thread on defining variables in other modules.

You can fix this by indeed mutating the variable with the broadcast notation, i.e. @everywhere testmoda.A = [1:nworkers();] like your first test. If you want to test mutability without broadcast, use a for loop or some other mechanism.

for i in eachindex(A) 
   A[i] = nworkers()[i]
end

although again, I imagine this is for learning purposes as there isn’t any difference between the broadcast code vs the for loop.

Also note that ismutable(testmoda.A) is fine. It’s not redefining A, and indeed tells you at whatever A is pointing to can be modified.

1 Like

Very clear, thank you!