using Distributed
procs = addprocs(10)
futures = []
for proc in procs
push!(futures, @spawnat proc sleep(2))
end
for fut in futures
fetch(fut)
end
The following:
module A
using Distributed
procs = addprocs(10)
futures = []
for proc in procs
push!(futures, @spawnat proc sleep(2))
end
for fut in futures
fetch(fut)
end
end
throws
ERROR: On worker 2:
UndefVarError: A not defined
on the line calling fetch.
I’m familiar with the need to make sure the right stuff ends up on the worker processes, e.g. making sure to add lines like
for proc in procs
@spawnat proc begin
import Pkg
Pkg.activate(...)
import Library1, Library2
end
end
prior to using whatever functionality I need.
But in the problem case above, the worker processes don’t need to know anything at all about the module A. My belief was that each process was a fresh copy of Julia, that runs whatever code we send to it – in this case just a sleep – but clearly that mental model isn’t quite accurate.
For context the desire is to create a library function that uses parallelism:
module MyLibrary
using Distributed
function my_parallel_function(num_procs)
procs = addprocs(num_procs)
...
end
end
What’s the appropriate way to do this? Combing through the documentation/stackoverflow/discourse hasn’t turned up much.
in the 2nd case procs is a global variable in Module A:
julia> procs
ERROR: UndefVarError: procs not defined
julia> A.procs
10-element Array{Int64,1}:
2
3
....
BTW: for doing such initialization stuff for a module in distributed code, you should look at __init__:
help?> __init__
search: __init__
__init__
__init__() function in your module would executes immediately after the module is
loaded at runtime for the first time (i.e., it is only called once and only after
....
Thanks for your response. The worker processes don’t need access to procs, however. And indeed if I make everything local to a function then the same error occurs:
module A
using Distributed
function foo()
procs = addprocs(10)
futures = []
for proc in procs
push!(futures, @spawnat proc sleep(2))
end
for fut in futures
fetch(fut)
end
end
end
A.foo()
This intended to be done at runtime, not initialization.
ok, I can only guess that @spawnat needs A as context for creating a closure to run on asynchronous workers. Distributed.pmap does not have that limitation. With …
module A
using Distributed
function foo()
futures = []
for proc in procs()
push!(futures, @spawnat proc myid())
end
fetch.(futures)
end
function __init__()
myid() == 1 && nprocs() < 11 && addprocs(11-nprocs())
end
end
Unfortunately that solution requires the line @everywhere include("A.jl") called from the REPL/Main. The desire is to be able to wrap up these details so that the user need specify no more than something like MyModule.my_parallel_function(..., number_of_processes=5).
As far as I can tell this is impossible. I’ve not been able to use @spawnat or pmap for this purpose anywhere except from the REPL/Main. Once they’re put inside a module, it appears one runs into a catch-22. Calling remote functions requires the containing module in order to run; but making the containing module available requires calling a remote function.
(To be clear: I’m aware that you can use @spawnat or pmap inside a module – but only by requiring some outside help, initialising the processes appropriately first. So it’s essentially impossible to use Distributed as an implementation detail inside a library; it always leaks into user-land; becomes tricky to use libaries inside libraries etc.)
The following code seems to work, but I’m not sure how robust this is?
File mwe.jl
module A
export test
using Distributed
function test()
procs = addprocs(10)
@everywhere procs begin
Base.MainInclude.eval(include(@__FILE__))
Base.MainInclude.eval(using .A)
end
futures = []
for proc in procs
push!(futures, @spawnat proc println("Hello from $proc"))
end
for fut in futures
fetch(fut)
end
end
end
File run.jl:
include("mwe.jl")
using .A
test()
Output:
From worker 9: Hello from 9
From worker 6: Hello from 6
From worker 10: Hello from 10
From worker 7: Hello from 7
From worker 3: Hello from 3
From worker 2: Hello from 2
From worker 4: Hello from 4
From worker 5: Hello from 5
From worker 11: Hello from 11
From worker 8: Hello from 8
For posterity (and those Googling who end up landing here in the future), here’s a short write-up.
After a bit of experimentation, it seems like @everywhere procs ..., for the initalisation step, is the secret sauce here. It works when for proc in procs; @spawnat proc ...; end does not. I’ve not tried digging into why that is.
By-the-by one needs to include a couple of extra lines to activate the current environment inside the new worker processes. I’ve tested that the following works when installed as a package called MyPackage:
# MyPackage.jl
module MyPackage
import Distributed
import Pkg
function test(procs)
project_path = splitdir(Pkg.project().path)[1]
Distributed.@everywhere procs begin
Main.eval(quote
import Pkg
Pkg.activate($$project_path)
import MyPackage
end)
end
Distributed.pmap(_->println("Hello from " * string(Distributed.myid())), Distributed.WorkerPool(procs), 1:6)
end
function test(num_procs::Integer)
project_path = splitdir(Pkg.project().path)[1]
procs = Distributed.addprocs(num_procs)
try
test(procs)
finally
Distributed.rmprocs(procs)
end
end
end
In each case the complicated bit being the initial set-up on each worker. After that you’re free to use pmap, @spawnat or @everywhere as you prefer.
I’ve not tested every combination of [my code above, @MilesCranmer's code above] x [environment, no environment] x [package, no package] and the behaviour in each case. I’m pretty sure sometimes it’s different. YMMV.