How to have each node in a cluster run a parallel job on its own CPUs, using SSH?

I have access to a cluster with multiple nodes, connected thorugh SSH, each of which has multiple CPUs. My algorithm uses DistributedArrays.jl to create a distributed array and have multiple processes work on it: at the end of the simulation, the result is returned to the master worker. Since I have a lot of nodes, it doesn’t make sense to distribute an array on all of them, so I thought of running multiple simulations, each having different parameters, on each node. The problem is that if I use addprocs(p) with p = [("node1", 3), ("node2", 3), ("node3", 3), ...] (basically, I’m launching 3 processes on each node), I get a list of all processes, without any subdivision into nodes.

What I would like is something like pmap, the assign a job to an available process until the list of jobs is completed, but which works for groups of processes. For example, I would like to write something like this:

pmap(f, [[2, 3, 4], [5, 6, 7], [8, 9, 10]], params)

This would apply f to each combination of parameters in params, and assign the job to three processes at a time, that would work in tandem until completion.

Is this possible?

1 Like

I may have found a solution. I’ve noticed that, each time I add nodes using addprocs(p) where p is something like p = [("node1", 3), ("node2", 3), ("node3", 3), ...], the first N workers of the vector returned by workers() each belong to one of the N added nodes. For example, if I add 3 nodes, whatever the number of process on each node is, the workers 2, 3 and 4 (1 is the master worker and doesn’t count) belong to the first, second and third node respectively. The order of nodes may not be respect tho (as in, if I add node1, node2 and node3 in this order in the vector p, the worker 2 may belong to node2 or node3, not necessarily node1). But it’s guaranteed that each of the first N workers will be on one of the different N nodes.
This means that, defining N to be the number of nodes (which is basically length(p)) I can write

workers()[begin:N]

to get a list of processes that belong to different nodes. Then I can use the fact that, when passing a worker id (let’s call it w) to the function procs(), a vector is returned containing the list of all processes that belong to the same node as w. Thus, broadcasting procs() to the list of the first N workers we got above:

all_workers = procs.(workers()[begin:N])

returns a vector of vectors, where each entry is a list of workers id that belong to the same nodes.
Now we construct a WorkerPool containing only the first N workers:

wp = WorkerPool(workers()[begin:N])

and we use this in pmap:

pmap(wp, my_collection...) do _
    node_pool = procs(myid())
    # do stuff
end

Inside the function passed to pmap, we can call procs(myid()) to get the list of all workers that belong to the same node as the current worker and use it in whatever distributed calculation we need, and having passed the WorkerPool to pmap means that we are guaranteed that every worker called by pmap will have a different pool of workers returned by procs(myid)).

I don’t know if this is an efficient or even idiomatic way to do this, or if there is a easier way. Also, I recommend to check that the first hypothesis holds for you, that is, the first N workers will belong to different nodes: this is because I’ve not found this fact anywhere in the docs, and it may just be that the configuration of the cluster I’m working on allows for this to be true.

1 Like

Did you get this issue resolved for good?

Your solution with WorkPool for sure is the easiest to implement. Another approaches would be:

  • create an algorithm that sends parameters to specific nodes with @spawnat
  • create small remote services waiting to consume data from a RemoteChannel, and you put your input values inside the right channel
1 Like

Maybe like this: RemoteChannels and workload queue architecture - #2 by draftman9