Two level distributed / parallel execution

Given a complex, large scale computation that spans a non trivial amount of data (Tens of GBs), I wish to split the work between multiple processes across several hosts in a cluster. Each host has dozens of processors and plenty of RAM.

To minimize data movement, I have three different types of invocations of non-serial work:

  1. Standard run-this-on-any-available-worker. The amount of sent data is small compared to the amount of compute, so it is OK if this is run on a different host on the cluster. I actually have very few of these, can probably live without them.

  2. Run on a different worker on the same host. Needed data is large, compared to the compute, but is available in a SharedArray, and I have dozens of threads to go through it.

  3. Run the function on a different host. The sent data is small, but the function will allocate some large SharedArrays and spawn local work (type 2 functions).

A naive solution would be to have a two level tree of workers; a main master process, a per host supervisor process, which controls per host slave workers.


A. Is there any Julia package which provides such a functionality? I am aware of Dagger but it doesn’t seem to match the above.

B. Suppose I wanted to create such a package myself, then. Naively, I would need to use some ClusterManager (in my cluster, SGE) to spawn the per-host supervisors, and then each of these would call a simple addprocs to create the local worker threads. Would this work?

C. I assume that creating a SharedArray on the supervisor process or any of the slave workers allows it to be efficiently accessed by any other process running on the same host. Sending the SharedArray through a channel or providing it as a parameter to a parallel function call would just work using shared memory. The documentation seems to imply this but doesn’t state this explicitly. Is this the case?

D. Similar question for atomic types such as locks. Sending them between processes on the same host would share the same object, allowing for coordinating work between these processes. Is that the case?

Note that in Python the answer for C and D is “no”; AFAIK one must create the shared data or atomic object before spawning the worker processes.

E. If I create a SharedArray on one host, then send it to another host, what is the semantics? Is this forbidden, or does the copy become a a SharedArray available to the processes in the second host? Would modifications be reflected back to the original host, or just be visible to the processes of the other host?

F. Similar question for atomic types (locks etc.). What happens if they are sent to another host? Forbidden, become an independent copy, or somehow implement distributed atomic functionality (shudder)?


Oren Ben-Kiki

1 Like

For case 3. DistributedArrays ??

Love your Avatar!

Distributed arrays work if you can shard the data so that each host can work in its own shard. That’s great when it happens! In my use cases, this is rare :frowning:

Also, in my two-level configuration, if you can shard the data that way, then it would be important for each type-3 function invocation to detect which shard it is on, and have it (and any type-2 function it invokes) work only on the data local to that host.

This assumes each shard is in shared memory accessible to all type-2 function invocations on the host. Otherwise, you would need to shard the data all the way to the individual workers… which means your compute and data need to be even more aligned.

So this raises two more questions:

G. In a two-level configuration along the lines I described, is it possible for a shard of a DArray to be accessible via shared memory across all the processes in a single host?

H. If not, is possible to create a DArray that is sharded across the all the individual workers across all the hosts (which are created by nested calls to addprocs)?

Thanks for bringing this up!

Hi, I was wondering if (and how) you solved your issues. I have a very similar situation in which I have a multinode cluster and I would like workers on a single node to work on large SharedArrays (but not in the DistributedArrays sense).

I ended up starting work on a library to provide this functionality. Currently this is a “side project” and I’m pretty busy in my two main projects so it is advancing very slowly :frowning: