HOWTO: Distributed computing
In this thread I would like to put together relevant information for people interested in distributing simple function calls on multiple cluster nodes. Although the task is simple, there are some rough (undocumented) corners in the language that inhibit even experienced users from accomplishing it currently.
The idea here is to update this content every now and then to reflect the latest (and cleanest) way of performing distributed computing with remote workers in Julia. If you read Discourse, you will find many related threads where people shared solutions for specific problems, which are currently outdated. I think we need a central thread of discussion to solve most issues once and for all.
Sample script
We will consider a sample script that processes a set of files in a data
folder and saves the results in a results
folder. I like this task because it involves IO and file paths, which can get tricky in remote machines:
# instantiate environment
using Pkg; Pkg.instantiate()
# load dependencies
using CSV
# HELPER FUNCTIONS
# ----------------
function process(infile, outfile)
# read file from disk
csv = CSV.read(infile)
# perform calculations
sleep(60) # pretend it takes time
csv.new = rand(size(csv,1))
# save new file to disk
CSV.write(outfile, csv)
end
# MAIN SCRIPT
# -----------
# relevant directories
indir = "data"
outdir = "results"
# files to process
infiles = readdir(indir, join=true)
outfiles = joinpath.(outdir, basename.(infiles))
nfiles = length(infiles)
for i in 1:nfiles
process(infiles[i], outfiles[i])
end
We follow Julia’s best practices:
- We start by instantiating the environment in the host machine, which lives in the files
Project.toml
andManifest.toml
in the project directory (the same directory of the script). - We then load the dependencies of the project, and define helper functions to be used.
- The main work is done in a loop that calls the helper function with various files.
Let’s call this script main.jl
. We can cd
into the project directory and call the script as follows (assuming Julia v1.4 or higher):
$ julia --project main.jl
Parallelization (same machine)
Our goal is to process the files in parallel. First, we will make minor modifications to the script to be able to run it with multiple processes on the same machine (e.g. the login node). This step is important for debugging:
- We load the
Distributed
stdlib to replace the simple for loop by apmap
call. It seems thatDistributed
is always available so we don’t need to instantiate the environment before loading it. That will be important because we will instantiate the other dependencies in all workers with a@everywhere
block call that is already available without any previous instantiation. - We add worker processes with
addprocs
on the same machine and tell these workers that they should also activate the same environment of the master process with theexeflags
option. - We wrap the preamble into a
@everywhere begin ... end
block, and replace the for loop by apmap
call. We also add atry ... catch
block to handle issues with specific files.
Here is the resulting script after the modifications:
using Distributed
# add processes on the same machine
addprocs(4, topology=:master_worker, exeflags="--project=$(Base.active_project())")
# SETUP FOR ALL PROCESSES
# -----------------------
@everywhere begin
# instantiate environment
using Pkg; Pkg.instantiate()
# load dependencies
using CSV
# HELPER FUNCTIONS
# ----------------
function process(infile, outfile)
# read file from disk
csv = CSV.read(infile)
# perform calculations
sleep(60) # pretend it takes time
csv.new = rand(size(csv,1))
# save new file to disk
CSV.write(outfile, csv)
end
end
# MAIN SCRIPT
# -----------
# relevant directories
indir = "data"
outdir = "results"
# files to process
infiles = readdir(indir, join=true)
outfiles = joinpath.(outdir, basename.(infiles))
nfiles = length(infiles)
status = pmap(1:nfiles) do i
try
process(infiles[i], outfiles[i])
true # success
catch e
@warn "failed to process $(infiles[i])"
false # failure
end
end
We execute the script as before:
$ julia --project main.jl
Questions
- Is there a more elegant method for the instantiation of the environment in remote workers? The
exeflags
feels like a hack.
IO issues
Notice that we used "data"
and "results"
as our file paths in the script. If we try to run the script from outside the project directory (e.g. proj
), we will get an error:
julia --project=proj proj/main.jl
ERROR: LoadError: SystemError: unable to read directory data: No such file or directory
Even worse, these file paths may not exist on different machines when we request multiple remote workers. To solve this, we need to use paths relative to the main.jl
source:
# relevant directories
indir = joinpath(@__DIR__,"data")
outdir = joinpath(@__DIR__,"results")
or set these paths in the command line using some package like DocOpt.jl
Our previous command should work with the suggested modifications:
julia --project=proj proj/main.jl
Parallelization (remote machines)
Finally, we would like to run the script above in a cluster with hundreds of remote worker processes. We don’t know in advance how many processes will be available because this is the job of a job scheduler (e.g. SLURM, PBS). We have the option of using ClusterManagers.jl and the option to call the julia
executable from a job script directly.
Questions
- Could you please advise on the cleanest approach currently?
- How do you modify the script below to work with remote processes?
using Distributed
# add processes on the same machine
addprocs(4, topology=:master_worker, exeflags="--project=$(Base.active_project())")
# SETUP FOR ALL PROCESSES
# -----------------------
@everywhere begin
# instantiate environment
using Pkg; Pkg.instantiate()
# load dependencies
using CSV
# HELPER FUNCTIONS
# ----------------
function process(infile, outfile)
# read file from disk
csv = CSV.read(infile)
# perform calculations
sleep(60) # pretend it takes time
csv.new = rand(size(csv,1))
# save new file to disk
CSV.write(outfile, csv)
end
end
# MAIN SCRIPT
# -----------
# relevant directories
indir = joinpath(@__DIR__,"data")
outdir = joinpath(@__DIR__,"results")
# files to process
infiles = readdir(indir, join=true)
outfiles = joinpath.(outdir, basename.(infiles))
nfiles = length(infiles)
status = pmap(1:nfiles) do i
try
process(infiles[i], outfiles[i])
true # success
catch e
@warn "failed to process $(infiles[i])"
false # failure
end
end
Appreciate if you can help improve this guide. I’ve created a repository on GitHub to track the improvements. Please feel free to submit PRs: https://github.com/juliohm/julia-distributed-computing