I have a local module, say MyProject
, which has a folder src
with two scripts script1.jl
and script2.jl
.
module MyProject
using Package1
using Package2
using Package3
include("script1.jl")
include("script2.jl")
export function1_in_script1,
function1_in_script2
Now, there is a for
loop in function1_in_script2
that I want to parallelize, which depends on the content in the script1.jl
and Package1
and Package2
.
If I don’t need parallelization, I would generally run the following code:
#Activating the environment
using Pkg
if isfile("Project.toml") && isfile("Manifest.toml")
Pkg.activate(".")
end
using MyProject
using AnyOtherRequiredPackage
# Call the function
function1_in_script2(...)
Now, how do I modify the above code to make sure that the for loop in the function1_in_script2
is available on all worker processes? I am using pmap
to parallelize the for loop. I understand, I can do something like:
using Distributed
addprocs(5)
#Now, I am confused about where to use the @everywhere macro
#Should it be on the entire module, i.e., @everywhere using MyProject
#Or should I use it inside the MyProject module?
Thanks!
Check the ultimate guide to distributed computing:
1 Like
Thanks for pointing to the GitHub repository. I went through it, and I understand the parallelization part described there; however, in my case, I want to parallelize something that’s part of the module (corresponding to which the environment is instantiated) and not something that’s defined and called after the module is loaded post-instantiating the environment.
May be it’s a trivial extension of what’s in the GitHub repo, but I just wanted to make sure that I am doing it correctly.
You can use remotecall_eval
to load the module on a worker (assuming it has access to the code), even if the worker is being spun up from a function defined within the module. I learned this from @dave.f.kleinschmidt, possibly he has a code example handy somewhere.
1 Like
Nothing I can share publicly unfortunately, but here’s a lightly sanitized version:
module MyStuff
using Distributed
using Pkg
# async create n_workers worker processes
function create_worker_processes(n_workers, manager=Disributed.LocalManager(); revise=false)
tasks = map(1:n_workers) do n
@async create_worker_process(manager; revise, n)
end
return @async map(fetch, tasks)
end
function create_worker_process(manager; revise=false, n=nothing)
n_str = n === nothing ? "" : " $(n)"
worker_str = string("worker", n_str)
@info "Requesting $(worker_str)..."
pid = only(addprocs(manager))
# make sure we activate the ACTUAL PROJECT that's active on the manager,
# which may be different than `@.` during e.g. CI runs
project = Pkg.project().path
Distributed.remotecall_eval(Main, pid,
:(using Pkg; Pkg.activate($(project))))
if revise
@info "Loading Revise on $(worker_str)..."
Distributed.remotecall_eval(Main, pid, :(using Revise))
end
@info "Loading MyStuff on $(worker_str)..."
Distributed.remotecall_eval(Main, pid, :(using MyStuff))
@info "$(worker_str) ready, PID $(pid)"
return pid
end
end # module
passing around the manager is a bit of extra cognitive overhead but v. useflu when you’re juggling, say, different kinds of K8s resources (GPU-equipped pods for training, CPU-only for batching etc.). in that case, we usually have another layer like
provision_workers(config)
train_workers = create_worker_processes(train_manager(config), config.n_train_workers)
batch_workers = create_worker_processes(batch_manager(config), config.n_batch_workers)
return (; train_workers = fetch(train_workers), batch_workers = fetch(batch_workers), config)
end
then we pass this “harness” to the functions that actually do the training/batching work so they know what workers to use. that is, there’s no magical distributed execution: the user/driver script has to specify which resources to use; this just makes it a bit more convenient to set up those resources so that they can be used effectively so the user does not need to do @everywhere using MyStuff
, set the project, etc.
3 Likes
Hi @dave.f.kleinschmidt,
Thanks for the detailed explanation. I probably need some time to understand all parts of your code; however, yesterday, I was playing around with my code, and I noticed that simply requesting (via a slurm script) a certain number of cores (say 24) on a particular node on a cluster, and then activating my project environment (shown below) on all cores automatically parallelized the pmap
function in my module.
using Distributed
addprocs(24)
#activating the MyProject environment on all workers
@everywhere begin
using Pkg
if isfile("Project.toml") && isfile("Manifest.toml")
Pkg.activate(".")
end
end
using MyProject # the pmap function lies somewhere inside one of the scripts in this module
#and then whatever I need to call from the MyProject module
I was not expecting it to parallelize but in the output log I can see that the function called inside pmap
ran on different workers. I was assuming that I would at least need to do @everywhere using MyProject
but that seems to be not required. Any comments on why it works, or may be I am missing something here?
Thanks for the help.
Mysterious! I’ve always found that unless you’ve actually run the code that defines the work functions on the workers, then you’ll get a undefvar error. But it’s possible that the work you’re trying to do (i.e. the function that you’re pmap
ing) does not require anything in MyProject
, otherwise it’s a mystery to me
For my own reference more than anything else, here’s the function I’m using:
function create_worker_processes(; n_workers = Sys.CPU_THREADS, revise=false)
N = last(sort(workers()))
asyncmap(1:n_workers) do i
worker_number = N + i
worker_str = "Worker $worker_number"
@info "Requesting Worker $(worker_number)..."
pid = only(addprocs(1))
project = Pkg.project().path
Distributed.remotecall_eval(Main, pid,
:(using Pkg; Pkg.activate($(project))))
if revise
@info "Loading Revise on $(worker_str)..."
Distributed.remotecall_eval(Main, pid, :(using Revise))
end
@info "Loading PackageName on $(worker_str)..."
Distributed.remotecall_eval(Main, pid, :(import PackageName))
@info "$(worker_str) ready, PID $(pid)"
pid
end
end
Thanks to @dave.f.kleinschmidt for the initial implementation and @ericphanson for suggesting checking out asyncmap
. Adding this function to my package, I’m able to write other functions that call @spawn
or @spawnat
and that allows me to make including Distributed
seamless for users of my package.
2 Likes