Should we control/optimize communications in Julia?

Hi everyone,

This is an edit of a previous question, as I sense that I wasn’t clear enough. I’m taking examples from the distributed computing documentation version 1.7.3 : Multi-processing and Distributed Computing · The Julia Language

As someone who learned distributed programming with MPI, I’ve learned in particular that it was crucial to avoid a maximum of communication between processes. That’s why I’m confused by the one-sided high-level philosophy of Julia.

In the following example, the tutorial shows us how to declare a random matrix on the process 2 and how to add 1 to every entries.

$ julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

I’m incredibly confused by this example. To me, it seems like :

  1. Process 2 has no identifier referencing to the matrix that was created on it. The only reference to this matrix is the Future on master process.
  2. Process 2 sends that matrix to master process
  3. Process 1 adds one to every entry of that matrix
  4. The result lies in process 1, it doesn’t exist on process 2

To me, it would’ve been more logical to

  1. Declare a matrix on process 2
  2. Tell process 2 to add one to every entry of that matrix

Can you tell me exactly what is going on, in terms of message passing, on that example? If it does what I think it does, then is Julia suited to write communication-optimized programs? If it does what I think it should do, then I would desperately need some explanation on the functioning of master process. Is it a virtual process, consisting in a high-level layer of references, that doesn’t perform useless communications, and allows us to not worry about it?

Another example of my making. Consider the following code :

$ julia -p 2
A = randn(16,16)
x = randn(16)
y = randn(16)

horizontalSlices = [1:8, 9:16]
Ax = fetch.( [@spawnat i+1 A[horizontalSlices[i], :] * x for i = 1:2 ] )
Ay = fetch.( [@spawnat i+1 A[horizontalSlices[i], :] * y for i = 1:2 ] )

To me, it seems that this code will scatter A to the workers twice, unless the master process has a high-level layer.

Thank you in advance

Best regards

That’s not what’s happening. The remotecall creates the matrix on process 2, as the second argument specifies:

  remotecall(f, id::Integer, args...; kwargs...) -> Future

  Call a function f asynchronously on the given arguments on the specified process. Return a Future.
  Keyword arguments, if any, are passed through to f.

So the matrix already lives on process 2.

Similarly, @spawnat 2 ... results in the remainder of the code to be executed on process 2, which fetches the result from itself (which should be fast).

Only fetch(s) then sends the result of s back to process 1.

No, it’s a regular julia process, visible in e.g. top, just as its workers are.

Indeed it will, but only because you specify the slicing to happen on the remote processes.

It should be noted that regular julia arrays always only live on the process they were created at - they are not the same as MPI arrays. You may want to use DistributedArrays.jl or a similar package for having worker-local slices abstracted as one global array.

In particular, the Distributed stdlib is not a julia equivalent of MPI. It’s more like the necessary glue to have worker processes to run code on in the first place. You may be interested in MPI.jl instead.

1 Like

Thank you very much for your answer. I think I will take a look at MPI.jl .

Would you agree on this statement : “the Distributed package is not designed to implement intensive parallel algorithms, but rather to parallelize intensive one-time instructions or short blocks of instructions” ?

In other words, if one wants to implement e.g an intensive communication avoiding LU decomposition, Distributed is not designed to do this, as the logic of computation is centralized in a master process

No, to me that’s completely orthogonal. The Distributed stdlib is used for example to implement ClusterManagers.jl. It’s a lower level building block for whatever distributed computation you can think of, be that parallel algorithms or “one time” instructions.

I’m puzzled. Your answers are very clear, but I feel like when I use Distributed I’m eventually forced to communicate when I really don’t need to, because workers have no references to the variable on which they do computations.

For example, when I look in the DistributedArray package, I looked at the implementation of the dot product between two DArrays. If I’m not mistaken, it works like this : each worker return the local dot product of their local parts, which is alright. However, that local part is accessed via the reference to the DArray itself. But that reference lies in the master process, and so that reference must be broadcasted which to me could’ve been avoided if only workers had reference to their chunk of the array.

$ julia -p 8
julia> @everywhere using DistributedArrays
julia> d = dones(16)
julia> localpart(d)

julia> @everywhere 3 localpart(d)
ERROR: On worker 3 : UndefVarError : d not defined

julia > @everywhere 3 localpart($d)
2-element Vector{Float64}:

It says in the documentation that the interpolation $ causes a broadcast. So I don’t understand how one could implement an efficient Fast Fourier Transform (for example) of a DArray, without issuing an unnecessary broadcast.

That looks like a bug to me, as fetch(@spawnat 2 localpart(d)) works fine for me.

Either way, as far as I can tell what you’re trying to do is really more suited to either MPI.jl (as linked above) or using the SPMD mode. @everywhere are quite a bit lower level.

1 Like

I think this works for you because @spawn has captured local variables of the master process. But if you want to access the localpart of d from a worker process, then you have to interpolate.

Thank you for your time and all your answers, I think I will study a little bit MPI.jl and SPMD mode and decide what is best for me!