Understanding message passing with pmap

I was trying to understand the behaviour of pmap and how vectors are updated inside of a generic function call. I wanted to know if new variables were being constantly created in pmap and whether this was truly efficient or if I should be using this differently.

My minimal example is here. I think technically this should have used the @distributed parallelism, but my actual example is much more complicated, so I used pmap.

using Distributed

addprocs(4)

X = [Array{Float64,1}(undef,1000) for w = 1:1000]

@everywhere function loadvectors(X)
  for a = 1:1000
    X[a] = a
  end
  return X
end

G = pmap(loadvectors,X)

Thing is, when I run this, I get the correct output out of G. That is fine. What I don’t understand is why X does not share the same values as G. Should I be thinking that pmap is copying the input X and then sending back a version of that to G? If so, is there a better way to be using this function or should I just tolerate the extra copying? I understand why pmap might be functioning in this way, but I wanted to know if I was using this correctly.

Is a way to reduce the amount of time necessary to simply write the result to disk and call it later when I return from the parallelized function? Or is the copy time generally considered very small in all cases? I’m specifically thinking of running code on a large cluster between different compute nodes that all write to the same hard disk space.

When using Julia with multiple processes (i.e. Distributed), each process has its own independent memory space.

In your example, by passing X to pmap, you are copying (actually, serializing and deserializing) each vector in X to its respective remote process, doing the work, and then the return function acts to copy the vector back. pmap collects the returned values into a list.

(Your use of X in loadvectors() is a little confusing, since this is not the same as the global X, but actually just one of the vectors from X.)

Throughout this process, the original X is unchanged, because the mutations in loadvectors() were made on copied versions of the data.

Copying data between processes, especially on multi-node architectures, has a cost. You will want to keep it to a minimum. In this case, you could create your 1000 item vectors on the remote machines and avoid the original copy. However, if you need to collect these results on a single node, you’ll still need to copy them back to the master.

If you really want to share data between processes within the same node, consider using SharedArrays (Multi-processing and Distributed Computing · The Julia Language).

Thank you. That all makes sense.

Do you have any thoughts on what it would mean to write the data to disk, load it onto each thread with something like Serialization, use it, and then put it back on the hard disk? In my full application, I unfortunately need to create the vector on the main thread before using it on the parallel threads, so I can not avoid the copy.

I think (?) the default interprocess serialization/deserialization occurs via sockets and so it will be much faster than writing to disk. I would recommend sticking to standard Julia tooling here and rely on its (implicit) serialization for you (don’t prematurely optimize until you’re really sure it’s a big problem). If it turns out the overhead from the interprocess transfers is larger than the time saved from distributing to multiple workers, then consider whether a multiprocess model is the right fit here.

Also: be careful with the terminology. You said ‘threads’ in your recent comment but Distributed module is about processes. Julia also has threads, and these all share the same memory space, so data written in one thread can be visible in other threads (but beware race conditions, etc.). Threading is limited to within one node, however, and so won’t scale to multi-node architectures.