The ultimate guide to distributed computing

I’m not an expert, but I orchestrate and launch a cluster on AWS via KissCluster: https://github.com/pszufe/KissCluster. That takes care of everything after some setup. Workflow is basically to launch a bunch of workers, put their IPs in machinefile, make sure everything is configured correctly (permissions, directories, etc.), and start running.

There are other tools which allow you to emulate more traditional clusters in AWS, but if you don’t need the frills or you don’t like the overhead then KissCluster is a good lightweight alternative. Specifically, I wanted to bring up the machinefile option because my workflow doesn’t rely on ClusterManagers.jl.

1 Like

Thank you @platawiec. I think I know how to make it work with ClusterManagers.jl on general clusters. I will give it a try soon, and will report the results.

The following hacky function can take the place of a machinefile. Perhaps a little too much of a corner case but I often want to fire up a set of jobs on as many of our dual-boot student lab machines as I can.

function whatsup()
    cs144l = map(i->@sprintf("cs144l-%02d.csis.ul.ie", i), 1:40)
    cs244l = map(i->@sprintf("cs244l-%02d.csis.ul.ie", i), 1:40)
    cs305l = map(i->@sprintf("cs305l-%02d.csis.ul.ie", i), [x for x in 1:32 if x != 7])# cs305l-07 misconfigured

    possibles = vcat(cs305l, cs144l, cs244l)
    usables = []

    # https://discourse.julialang.org/t/how-can-i-ping-an-ip-adress-in-julia/3380
    #   with v1.0 modifications
    @sync for p in possibles
        @async push!(usables, match(r"PING ([^\s]*)\s",
                                    split(read(`ping -c 1 $p`, String), "\n";
                                          keepempty=false)[1])[1]
                     )
        sleep(0.1)
    end;
    string.(usables)
end

Edited as per suggestion of @tkf.

2 Likes

Thank you @healyp for sharing. Handy funcion when you know beforehand the IPs.

@spawn push!(usables, ...) has a data race. Use @async instead. See also the manual:

You are entirely responsible for ensuring that your program is data-race free, and nothing promised here can be assumed if you do not observe that requirement.

You also need @sync for p in possibles to wait for all the tasks. With the code as-is, it’s very possible that the function returns before any of the task finishes.

4 Likes

As @juliohm pinged me in [ANN] FileTrees.jl -- easy everyday parallelism on trees of files, I thought I’d give an example of how this could be done with FileTrees.jl

# SETUP FOR ALL PROCESSES
# -----------------------
@everywhere begin
  # instantiate environment
  using Pkg; Pkg.instantiate()

  # load dependencies
  using CSV, FileTrees, DataFrames
function process(infile)
  # read file from disk
  csv = CSV.read(infile)

  # perform calculations
  sleep(60) # pretend it takes time
  csv.new = rand(size(csv,1))

end
end

# MAIN SCRIPT
# -----------

# relevant directories
indir  = joinpath(@__DIR__,"data")
outdir = joinpath(@__DIR__,"results")

# files to process
infiles  = FileTree(indir)

new_dfs = FileTrees.load(infiles, lazy=true) do f
  try
    process(path(f))
  catch e
    @warn "failed to process $(infiles[i])"
    DataFrame() # store an empty file?
  end
end

outfiles = rename(indir, outdir)
FileTrees.save(outfiles) do file
  # save new file to disk
  CSV.write(outfile, csv)
end # computation actually happens here.

This will launch the tasks using the Dagger scheduler. It will use both Threads and Distributed procs. If Julia is started from an env with JULIA_NUM_THREADS=N.

You can apply mapvalues on new_dfs, and it will be a lazy operation and will not bring the data to the master process. You can also use reducevalues to reduce an iterator of all the DataFrames.

This same code will work if the files are in a nested directory structure, besides, you get to filter files with glob"" syntax.

So it’s useful if you have a cluster but also want to use multithreading on each node in the cluster to speed up communication.

@juliohm I don’t think I have anything more to add about Distributed than that which is already mentioned on this thread though!

8 Likes

Thank you @shashi, that is already very helpful.

I am coming back to this issue today. I still couldn’t figure out how we can remove the explicit line:

# add processes on the same machine
addprocs(4, topology=:master_worker, exeflags="--project=$(Base.active_project())")

from the example script, and add processes on the command line:

julia -p 4 --project main.jl

Does anyone know how to make it work?

Worker processes can’t find the project environment when we remove the addprocs with the exeflags option. I am really stuck and need this feature in order to get some work done with some huge datasets on a cluster.

Ok, I’ve figured out how to remove the exeflags and the explicit addprocs command with the help of @pfitzseb. Discourse is not allowing edition of the original post for some reason, I think I will migrate the instructions to GitHub as well.

Ok, the repository is updated with the latest instructions. I am glad that we don’t need to pass the exeflags option manually.

1 Like

What is a good way to make sure workers are removed after computation is finished? From what I understand, one must remember to call rmprocs to free them.

Assuming this is correct, it would be nice to be able to have an API where I specify what I want to do with how many workers which releases the workers after work is done.

Here is some kind of MWE, but the API is not nice due to having to provide an expression for @everywhere:

function runlsf(f, expr, n=1)
    newprocs = addprocs_lsf(n)
    try
        @everywhere newprocs $expr
        return f()
    finally
        rmprocs(newprocs)
    end
end

julia> runlsf(quote
       using Statistics
       end, 1) do 
       remotecall_fetch(mean, procs() |> last, 1:10)
       end
5.5

Would have been much nicer if it could be written something like

@runlsf 1 with using Statistics do remotecall_fetch(mean, procs() |> last, 1:10)

Where did you find this information about a need for a rmprocs call at the end @DrChainsaw? I was assuming that the end of the script would call the rmprocs automatically. It would be nice to double check that.

I am also using the LSF job manager in our company, and things are working fine now after some hacks. For me ClusterManagers.jl doesn’t work yet because it doesn’t use blaunch under the hood.

Hmmm, not sure what you mean. The LSF-jobs are not stopped until rmprocs is called, right?

Maybe I have too high ambition here, but I want to use FileTrees backed by the LSF cluster kinda interactively from the REPL (e.g start loading some data, maybe have a couple of meetings or lunch in between, then explore a little, more meetings etc.).

This might differ between LSF clusters depending on config, but for us it is so that once the job is started on LSF, that core is yours and only yours until the job completes. People are not gonna be very happy if they find out I hog cores without them doing anything.

I mean that Julia Distributed and the ClusterManagers.jl will take care of calling rmprocs on exit. You shouldn’t need to call it explicitly as far as I know.

I don’t know why you need interactivity, but if you are just logging in a compute node you can simply request processes without LSF by passing the -p option to Julia. I am using LSF from the login node to reserve resources in multiple nodes. In this case I am processing tons of heavy files already in parallel and everything works except that I need to do it a bit differently with a bsub script as opposed to vanilla ClusterManagers.jl

I am not questioning how LSF works, that is assumed as common knowledge. The problem now is that ClusterManagers.jl is reserving a bunch of tiny jobs as opposed to a big job with multiple processes. The last time I tried to do a pmap call with it I couldn’t make it work.

Yes, this it does.

My question was only in the context of REPL-like work (where one still wants to use the cluster). At least I tend to keep my REPL open to avoid having to load packages and data again and I don’t see why this needs to be completely divorced from the distributed computing scenario. Think hundreds/thousands of files which each take some 2-20 minutes to process which I want to have at my fingertips.

Anyways, Its no big deal really just convenience. I was also thinking about if I alot of people use it the probability that someone forgets to rmprocs after doing a chunk of processing and then leaves the REPL open becomes significant.

Btw, I wasn’t implying that you don’t know how LSF works and I’m sorry if I made it sound like that. I’m just a user and don’t do any configuration or adminstration of the cluster. I didn’t want to assume it works the same for everyone (it seems to be very flexible).

Fwiw, ClusterManagers seems to work pretty much out of the box for me. I just had to use exeflags to supply a port number to workers as the default range was not open (I omitted this part in my example for brevity).

I see your point. You are looking for some convenience macro to use interactively in a REPL session so that you request resources and release them every time you call a chunk of code. This is a bit unusual, specially when you are sharing resources with other users. As you know, requesting resources may take some waiting time in a job queue. You are lucky to have these resources available all the time, but in most cases people submit a job and wait in a queue.

Yeah, I guess the challenge is basically just how to give it a nice enough API, so the core question was maybe not so much about distributed computing.

I agree the “traditional” way to use a cluster is to submit a job and wait.

Over the years we have found it to be useful as generic massively parallel resource and have started to do things like testing and builds (CI and manual) on it. We have a big selection of queues with different rules and some of them are for time limited jobs which are allowed to pre-empt (i.e pause) long running jobs. This might require admins who constantly look after the statistics of how jobs are treated and take appropriate actions (change policies, increase capacity etc.), but as I said I’m just a user and for me it “just works” (most of the time).

Anyways, sorry for the noise in this thread. Hopefully it can provide some insight to some people (me in this case :slight_smile: ) use distributed computing.

Regarding LSF jobs, if you start an interactive job you are reserving those cores.
Yes, you might not be using them.

You are probably looking at something like Condor pools - which are not that popular these days.
Or going back further Mosix - which was a rather fantastic system which migrated processes by checkpointing them and moving them to another system. Processes - not VMs.
In fact Mosix might have been a great match for Julia and its distributed processing…

Update - the last version was in 2017… I really wonder if Julia would fit well on top of Mosix.
Just launch distributed processes on the local machine and watch them migrate.
Note to self - dont be daft. As I recall Mosix did not allow file IO on the worker nodes.

Yup, starting an interactive job at my place is a sure way to get a message from an admin who wonders what you are up to. This is why I wan’t to shut down the jobs as soon as computation is finished. The REPL is not running as an LSF job in my case.

For me it is more about using what is already there. Requesting a new cluster type is not really feasible. I’ll look up Condor pools anyways just to learn something new (old? :slight_smile: ) though.

In a more modern view of the world I have mused on Firecracker to spin up VMs quickly
There is a limit to the size of a payload for Firecracker though