The ultimate guide to distributed computing

HOWTO: Distributed computing

In this thread I would like to put together relevant information for people interested in distributing simple function calls on multiple cluster nodes. Although the task is simple, there are some rough (undocumented) corners in the language that inhibit even experienced users from accomplishing it currently.

The idea here is to update this content every now and then to reflect the latest (and cleanest) way of performing distributed computing with remote workers in Julia. If you read Discourse, you will find many related threads where people shared solutions for specific problems, which are currently outdated. I think we need a central thread of discussion to solve most issues once and for all.

Sample script

We will consider a sample script that processes a set of files in a data folder and saves the results in a results folder. I like this task because it involves IO and file paths, which can get tricky in remote machines:

# instantiate environment
using Pkg; Pkg.instantiate()

# load dependencies
using CSV

# HELPER FUNCTIONS
# ----------------
function process(infile, outfile)
  # read file from disk
  csv = CSV.read(infile)

  # perform calculations
  sleep(60) # pretend it takes time
  csv.new = rand(size(csv,1))

  # save new file to disk
  CSV.write(outfile, csv)
end

# MAIN SCRIPT
# -----------

# relevant directories
indir  = "data"
outdir = "results"

# files to process
infiles  = readdir(indir, join=true)
outfiles = joinpath.(outdir, basename.(infiles))
nfiles   = length(infiles)

for i in 1:nfiles
  process(infiles[i], outfiles[i])
end

We follow Julia’s best practices:

  1. We start by instantiating the environment in the host machine, which lives in the files Project.toml and Manifest.toml in the project directory (the same directory of the script).
  2. We then load the dependencies of the project, and define helper functions to be used.
  3. The main work is done in a loop that calls the helper function with various files.

Let’s call this script main.jl. We can cd into the project directory and call the script as follows (assuming Julia v1.4 or higher):

$ julia --project main.jl

Parallelization (same machine)

Our goal is to process the files in parallel. First, we will make minor modifications to the script to be able to run it with multiple processes on the same machine (e.g. the login node). This step is important for debugging:

  • We load the Distributed stdlib to replace the simple for loop by a pmap call. It seems that Distributed is always available so we don’t need to instantiate the environment before loading it. That will be important because we will instantiate the other dependencies in all workers with a @everywhere block call that is already available without any previous instantiation.
  • We add worker processes with addprocs on the same machine and tell these workers that they should also activate the same environment of the master process with the exeflags option.
  • We wrap the preamble into a @everywhere begin ... end block, and replace the for loop by a pmap call. We also add a try ... catch block to handle issues with specific files.

Here is the resulting script after the modifications:

using Distributed

# add processes on the same machine
addprocs(4, topology=:master_worker, exeflags="--project=$(Base.active_project())")

# SETUP FOR ALL PROCESSES
# -----------------------
@everywhere begin
  # instantiate environment
  using Pkg; Pkg.instantiate()

  # load dependencies
  using CSV

  # HELPER FUNCTIONS
  # ----------------
  function process(infile, outfile)
    # read file from disk
    csv = CSV.read(infile)

    # perform calculations
    sleep(60) # pretend it takes time
    csv.new = rand(size(csv,1))

    # save new file to disk
    CSV.write(outfile, csv)
  end
end

# MAIN SCRIPT
# -----------

# relevant directories
indir  = "data"
outdir = "results"

# files to process
infiles  = readdir(indir, join=true)
outfiles = joinpath.(outdir, basename.(infiles))
nfiles   = length(infiles)

status = pmap(1:nfiles) do i
  try
    process(infiles[i], outfiles[i])
    true # success
  catch e
    @warn "failed to process $(infiles[i])"
    false # failure
  end
end

We execute the script as before:

$ julia --project main.jl

Questions

  • Is there a more elegant method for the instantiation of the environment in remote workers? The exeflags feels like a hack.

IO issues

Notice that we used "data" and "results" as our file paths in the script. If we try to run the script from outside the project directory (e.g. proj), we will get an error:

julia --project=proj proj/main.jl
ERROR: LoadError: SystemError: unable to read directory data: No such file or directory

Even worse, these file paths may not exist on different machines when we request multiple remote workers. To solve this, we need to use paths relative to the main.jl source:

# relevant directories
indir  = joinpath(@__DIR__,"data")
outdir = joinpath(@__DIR__,"results")

or set these paths in the command line using some package like DocOpt.jl

Our previous command should work with the suggested modifications:

julia --project=proj proj/main.jl

Parallelization (remote machines)

Finally, we would like to run the script above in a cluster with hundreds of remote worker processes. We don’t know in advance how many processes will be available because this is the job of a job scheduler (e.g. SLURM, PBS). We have the option of using ClusterManagers.jl and the option to call the julia executable from a job script directly.

Questions

  • Could you please advise on the cleanest approach currently?
  • How do you modify the script below to work with remote processes?
using Distributed

# add processes on the same machine
addprocs(4, topology=:master_worker, exeflags="--project=$(Base.active_project())")

# SETUP FOR ALL PROCESSES
# -----------------------
@everywhere begin
  # instantiate environment
  using Pkg; Pkg.instantiate()

  # load dependencies
  using CSV

  # HELPER FUNCTIONS
  # ----------------
  function process(infile, outfile)
    # read file from disk
    csv = CSV.read(infile)

    # perform calculations
    sleep(60) # pretend it takes time
    csv.new = rand(size(csv,1))

    # save new file to disk
    CSV.write(outfile, csv)
  end
end

# MAIN SCRIPT
# -----------

# relevant directories
indir  = joinpath(@__DIR__,"data")
outdir = joinpath(@__DIR__,"results")

# files to process
infiles  = readdir(indir, join=true)
outfiles = joinpath.(outdir, basename.(infiles))
nfiles   = length(infiles)

status = pmap(1:nfiles) do i
  try
    process(infiles[i], outfiles[i])
    true # success
  catch e
    @warn "failed to process $(infiles[i])"
    false # failure
  end
end

Appreciate if you can help improve this guide. I’ve created a repository on GitHub to track the improvements. Please feel free to submit PRs: https://github.com/juliohm/julia-distributed-computing

Contributors

@juliohm @samuel_okon

25 Likes

Perhaps an intro to RemoteChannels for data transfer between workers is useful, as well as a guide to ProgressMeter.jl for parallel jobs. Finally a guide to a parallel mapreduce operation using RemoteChannels would be great.

1 Like

Nice!

In respect to first Question, avoiding the execflags, you can avoiding it you put at the beginning

using Pkg

Pkg.activate(@__DIR__)

In that way, if the file is in a directory with a environment, it is always loaded, without considering the directory in which you are running Julia.

1 Like

Thank you @jishnub for the feedback. I think these details can come later after we have the core of the distributed execution working out.

Thank you @dmolina, will try it locally. Nice suggestion.

It didn’t work for me @dmolina. We need to exeflags apparently.

It is trange, I did it for me, I had test it. Well, anyway, it is not too critical.

I’ve added the script to a repo so that other people can test it: https://github.com/juliohm/julia-distributed-computing

Also updated the original post. @dmolina can you try your suggestion there and submit a PR if it works?

@juliohm I feel the @sync @everywhere ... is redundant and should really be @everywhere ....

Thank you @samuel_okon, could you please elaborate? You mean that the @everywhere is already blocking?

You can also connect to remote workers via a list of IPs, following is my typical startup which adds one worker per IP listed in each line of machinefile.txt:

machines = readlines("machinefile.txt")
machines_and_workers = [(machine, 1) for machine in machines]

addprocs(
    machines_and_workers,
    enable_threaded_blas=true,
    topology=:master_worker
)

I use an approach similar to the original post to instantiate the projects for each worker - needs to be said that the directory structure and contents must be identical between master and workers.

2 Likes

@juliohm yeah @everywhere ... calls remotecall_eval which is blocking

1 Like

That is nice @platawiec, thank you for sharing. How do you generate the machine file? Are you relying on some job scheduler? Could you please share some more details on how we can automate the process?

Nice @samuel_okon, I’ve updated the instructions to remove the @sync.

1 Like

@platawiec I wonder if we can use the julia --machine-file command line option and still enable the exeflags and topology options.

I’m not an expert, but I orchestrate and launch a cluster on AWS via KissCluster: https://github.com/pszufe/KissCluster. That takes care of everything after some setup. Workflow is basically to launch a bunch of workers, put their IPs in machinefile, make sure everything is configured correctly (permissions, directories, etc.), and start running.

There are other tools which allow you to emulate more traditional clusters in AWS, but if you don’t need the frills or you don’t like the overhead then KissCluster is a good lightweight alternative. Specifically, I wanted to bring up the machinefile option because my workflow doesn’t rely on ClusterManagers.jl.

Thank you @platawiec. I think I know how to make it work with ClusterManagers.jl on general clusters. I will give it a try soon, and will report the results.

The following hacky function can take the place of a machinefile. Perhaps a little too much of a corner case but I often want to fire up a set of jobs on as many of our dual-boot student lab machines as I can.

function whatsup()
    cs144l = map(i->@sprintf("cs144l-%02d.csis.ul.ie", i), 1:40)
    cs244l = map(i->@sprintf("cs244l-%02d.csis.ul.ie", i), 1:40)
    cs305l = map(i->@sprintf("cs305l-%02d.csis.ul.ie", i), [x for x in 1:32 if x != 7])# cs305l-07 misconfigured

    possibles = vcat(cs305l, cs144l, cs244l)
    usables = []

    # https://discourse.julialang.org/t/how-can-i-ping-an-ip-adress-in-julia/3380
    #   with v1.0 modifications
    @sync for p in possibles
        @async push!(usables, match(r"PING ([^\s]*)\s",
                                    split(read(`ping -c 1 $p`, String), "\n";
                                          keepempty=false)[1])[1]
                     )
        sleep(0.1)
    end;
    string.(usables)
end

Edited as per suggestion of @tkf.

1 Like

Thank you @healyp for sharing. Handy funcion when you know beforehand the IPs.

@spawn push!(usables, ...) has a data race. Use @async instead. See also the manual:

You are entirely responsible for ensuring that your program is data-race free, and nothing promised here can be assumed if you do not observe that requirement.

You also need @sync for p in possibles to wait for all the tasks. With the code as-is, it’s very possible that the function returns before any of the task finishes.

2 Likes