Error running distributed code inside of a module

The following works just fine:

using Distributed

procs = addprocs(10)

futures = []
for proc in procs
	push!(futures, @spawnat proc sleep(2))
end
for fut in futures
	fetch(fut)
end

The following:

module A
    using Distributed

    procs = addprocs(10)

    futures = []
    for proc in procs
        push!(futures, @spawnat proc sleep(2))
    end
    for fut in futures
        fetch(fut)
    end
end

throws

ERROR: On worker 2:
UndefVarError: A not defined

on the line calling fetch.

I’m familiar with the need to make sure the right stuff ends up on the worker processes, e.g. making sure to add lines like

for proc in procs
    @spawnat proc begin
        import Pkg
        Pkg.activate(...)
        import Library1, Library2
    end
end

prior to using whatever functionality I need.

But in the problem case above, the worker processes don’t need to know anything at all about the module A. My belief was that each process was a fresh copy of Julia, that runs whatever code we send to it – in this case just a sleep – but clearly that mental model isn’t quite accurate.

For context the desire is to create a library function that uses parallelism:

module MyLibrary
    using Distributed
    function my_parallel_function(num_procs)
        procs = addprocs(num_procs)
        ...
    end
end

What’s the appropriate way to do this? Combing through the documentation/stackoverflow/discourse hasn’t turned up much.

CC @MilesCranmer

in the 2nd case procs is a global variable in Module A:

julia> procs
ERROR: UndefVarError: procs not defined

julia> A.procs
10-element Array{Int64,1}:
 2
 3
....

BTW: for doing such initialization stuff for a module in distributed code, you should look at __init__:

help?> __init__
search: __init__

  __init__

  __init__() function in your module would executes immediately after the module is
  loaded at runtime for the first time (i.e., it is only called once and only after
....

Thanks for your response. The worker processes don’t need access to procs, however. And indeed if I make everything local to a function then the same error occurs:

module A
    using Distributed

    function foo()
        procs = addprocs(10)

        futures = []
        for proc in procs
            push!(futures, @spawnat proc sleep(2))
        end
        for fut in futures
            fetch(fut)
        end
    end
end

A.foo()

This intended to be done at runtime, not initialization.

ok, I can only guess that @spawnat needs A as context for creating a closure to run on asynchronous workers. Distributed.pmap does not have that limitation. With …

module A
    using Distributed

    function foo()
        futures = []
        for proc in procs()
            push!(futures, @spawnat proc myid())
        end
        fetch.(futures)
    end

    function __init__()
        myid() == 1 && nprocs() < 11 && addprocs(11-nprocs())
    end 
end

the following works:

julia> include("A.jl")
Main.A

julia> using Distributed

julia> pmap(_->myid(), 1:nprocs())
11-element Array{Int64,1}:
  2
  3
  4
  6
  5
  9
  7
  8
 10
 11
  4

julia> @everywhere include("A.jl")
WARNING: replacing module A.

julia> A.foo()
11-element Array{Int64,1}:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
1 Like

Unfortunately that solution requires the line @everywhere include("A.jl") called from the REPL/Main. The desire is to be able to wrap up these details so that the user need specify no more than something like MyModule.my_parallel_function(..., number_of_processes=5).

As far as I can tell this is impossible. I’ve not been able to use @spawnat or pmap for this purpose anywhere except from the REPL/Main. Once they’re put inside a module, it appears one runs into a catch-22. Calling remote functions requires the containing module in order to run; but making the containing module available requires calling a remote function.

(To be clear: I’m aware that you can use @spawnat or pmap inside a module – but only by requiring some outside help, initialising the processes appropriately first. So it’s essentially impossible to use Distributed as an implementation detail inside a library; it always leaks into user-land; becomes tricky to use libaries inside libraries etc.)

The following code seems to work, but I’m not sure how robust this is?

File mwe.jl

module A
	export test
	using Distributed

	function test()
		procs = addprocs(10)
		@everywhere procs begin
			Base.MainInclude.eval(include(@__FILE__))
			Base.MainInclude.eval(using .A)
		end
		futures = []
		for proc in procs
			push!(futures, @spawnat proc println("Hello from $proc"))
		end
		for fut in futures
			fetch(fut)
		end
	end
end

File run.jl:

include("mwe.jl")
using .A
test()

Output:

      From worker 9:    Hello from 9
      From worker 6:    Hello from 6
      From worker 10:   Hello from 10
      From worker 7:    Hello from 7
      From worker 3:    Hello from 3
      From worker 2:    Hello from 2
      From worker 4:    Hello from 4
      From worker 5:    Hello from 5
      From worker 11:   Hello from 11
      From worker 8:    Hello from 8
4 Likes

Amazing, thank you!

For posterity (and those Googling who end up landing here in the future), here’s a short write-up.

After a bit of experimentation, it seems like @everywhere procs ..., for the initalisation step, is the secret sauce here. It works when for proc in procs; @spawnat proc ...; end does not. I’ve not tried digging into why that is.

By-the-by one needs to include a couple of extra lines to activate the current environment inside the new worker processes. I’ve tested that the following works when installed as a package called MyPackage:

# MyPackage.jl

module MyPackage

import Distributed
import Pkg

function test(procs)
    project_path = splitdir(Pkg.project().path)[1]
    Distributed.@everywhere procs begin
        Main.eval(quote
            import Pkg
            Pkg.activate($$project_path)
			import MyPackage
        end)
    end

    Distributed.pmap(_->println("Hello from " * string(Distributed.myid())), Distributed.WorkerPool(procs), 1:6)
end

function test(num_procs::Integer)
    project_path = splitdir(Pkg.project().path)[1]
    procs = Distributed.addprocs(num_procs)
    try
        test(procs)
    finally
        Distributed.rmprocs(procs)
    end
end

end

In each case the complicated bit being the initial set-up on each worker. After that you’re free to use pmap, @spawnat or @everywhere as you prefer.

I’ve not tested every combination of [my code above, @MilesCranmer's code above] x [environment, no environment] x [package, no package] and the behaviour in each case. I’m pretty sure sometimes it’s different. YMMV.

4 Likes