Distributed computing for functions in scripts inside a local module?

I have a local module, say MyProject, which has a folder src with two scripts script1.jl and script2.jl.

module MyProject

using Package1
using Package2
using Package3

include("script1.jl")
include("script2.jl")

export function1_in_script1,
    function1_in_script2

Now, there is a for loop in function1_in_script2 that I want to parallelize, which depends on the content in the script1.jl and Package1 and Package2.

If I don’t need parallelization, I would generally run the following code:

#Activating the environment
using Pkg
if isfile("Project.toml") && isfile("Manifest.toml")
    Pkg.activate(".")
end

using MyProject
using AnyOtherRequiredPackage

# Call the function
function1_in_script2(...)

Now, how do I modify the above code to make sure that the for loop in the function1_in_script2 is available on all worker processes? I am using pmap to parallelize the for loop. I understand, I can do something like:

using Distributed
addprocs(5)

#Now, I am confused about where to use the @everywhere macro
#Should it be on the entire module, i.e., @everywhere using MyProject 
#Or should I use it inside the MyProject module?

Thanks!

Check the ultimate guide to distributed computing:

1 Like

Thanks for pointing to the GitHub repository. I went through it, and I understand the parallelization part described there; however, in my case, I want to parallelize something that’s part of the module (corresponding to which the environment is instantiated) and not something that’s defined and called after the module is loaded post-instantiating the environment.

May be it’s a trivial extension of what’s in the GitHub repo, but I just wanted to make sure that I am doing it correctly.

You can use remotecall_eval to load the module on a worker (assuming it has access to the code), even if the worker is being spun up from a function defined within the module. I learned this from @dave.f.kleinschmidt, possibly he has a code example handy somewhere.

1 Like

Nothing I can share publicly unfortunately, but here’s a lightly sanitized version:

module MyStuff

using Distributed
using Pkg

# async create n_workers worker processes
function create_worker_processes(n_workers, manager=Disributed.LocalManager(); revise=false)
    tasks = map(1:n_workers) do n
        @async create_worker_process(manager; revise, n)
    end

    return @async map(fetch, tasks)
end

function create_worker_process(manager; revise=false, n=nothing)
    n_str = n === nothing ? "" : " $(n)"
    worker_str = string("worker", n_str)
    @info "Requesting $(worker_str)..."
    pid = only(addprocs(manager))
    # make sure we activate the ACTUAL PROJECT that's active on the manager,
    # which may be different than `@.` during e.g. CI runs
    project = Pkg.project().path
    Distributed.remotecall_eval(Main, pid,
                                :(using Pkg; Pkg.activate($(project))))
    if revise
        @info "Loading Revise on $(worker_str)..."
        Distributed.remotecall_eval(Main, pid, :(using Revise))
    end
    @info "Loading MyStuff on $(worker_str)..."
    Distributed.remotecall_eval(Main, pid, :(using MyStuff))
    @info "$(worker_str) ready, PID $(pid)"
    return pid
end

end # module

passing around the manager is a bit of extra cognitive overhead but v. useflu when you’re juggling, say, different kinds of K8s resources (GPU-equipped pods for training, CPU-only for batching etc.). in that case, we usually have another layer like

provision_workers(config)
    train_workers = create_worker_processes(train_manager(config), config.n_train_workers)
    batch_workers = create_worker_processes(batch_manager(config), config.n_batch_workers)
    return (; train_workers = fetch(train_workers), batch_workers = fetch(batch_workers), config)
end

then we pass this “harness” to the functions that actually do the training/batching work so they know what workers to use. that is, there’s no magical distributed execution: the user/driver script has to specify which resources to use; this just makes it a bit more convenient to set up those resources so that they can be used effectively so the user does not need to do @everywhere using MyStuff, set the project, etc.

3 Likes

Hi @dave.f.kleinschmidt,

Thanks for the detailed explanation. I probably need some time to understand all parts of your code; however, yesterday, I was playing around with my code, and I noticed that simply requesting (via a slurm script) a certain number of cores (say 24) on a particular node on a cluster, and then activating my project environment (shown below) on all cores automatically parallelized the pmap function in my module.

using Distributed
addprocs(24)

#activating the MyProject environment on all workers
@everywhere begin
    using Pkg
    if isfile("Project.toml") && isfile("Manifest.toml")
        Pkg.activate(".")
    end
end

using MyProject # the pmap function lies somewhere inside one of the scripts in this module

#and then whatever I need to call from the MyProject module

I was not expecting it to parallelize but in the output log I can see that the function called inside pmap ran on different workers. I was assuming that I would at least need to do @everywhere using MyProject but that seems to be not required. Any comments on why it works, or may be I am missing something here?

Thanks for the help.

Mysterious! I’ve always found that unless you’ve actually run the code that defines the work functions on the workers, then you’ll get a undefvar error. But it’s possible that the work you’re trying to do (i.e. the function that you’re pmaping) does not require anything in MyProject, otherwise it’s a mystery to me

For my own reference more than anything else, here’s the function I’m using:

function create_worker_processes(; n_workers = Sys.CPU_THREADS, revise=false)
    N = last(sort(workers()))
    asyncmap(1:n_workers) do i
      worker_number = N + i
      worker_str = "Worker $worker_number"
      @info "Requesting Worker $(worker_number)..."
      pid = only(addprocs(1))
      project = Pkg.project().path
      Distributed.remotecall_eval(Main, pid,
                                  :(using Pkg; Pkg.activate($(project))))
      if revise
          @info "Loading Revise on $(worker_str)..."
          Distributed.remotecall_eval(Main, pid, :(using Revise))
      end
      @info "Loading PackageName on $(worker_str)..."
      Distributed.remotecall_eval(Main, pid, :(import PackageName))
      @info "$(worker_str) ready, PID $(pid)"
      pid
    end
end

Thanks to @dave.f.kleinschmidt for the initial implementation and @ericphanson for suggesting checking out asyncmap. Adding this function to my package, I’m able to write other functions that call @spawn or @spawnat and that allows me to make including Distributed seamless for users of my package.

2 Likes