Fetch part of an array with indexes known by the workers

Hi everyone,

I’m having some troubles with the Distributed functionalities of Julia.
I want to declare a matrix A on the main process and give slices of A to the workers. I launch Julia with the command julia -p 8

@everywhere using SparseArrays

A = sprandn(16,16,0.2) #To my understanding, A exists on proc1

@everywhere begin
if myid() != 1
i = myid() -1
top = 2*(i-1) + 1
bot = 2*i #To my understanding, on proc 2, top = 1,bot =2. On proc 3, top=3, bot=4, etc.
A = @fetchfrom 1 A[top:bot, :] #To my understanding, on procs 2 to 9, A is a slice of A from the proc 1
end
end

@everywhere println(A.nzval)
# And now I see that all workers but 4 have the same slice of the bottom of initial matrix A!

I’m really confused. If only all workers had the same bottom slice of initial A, I would’ve told myself : maybe there’s a reason why variables top and bot are always equal to 15 and 16. But apparently one of the workers (here worker number 4) ended up with the top slice of the initial matrix A! To my understanding, when I type @fetchfrom 1 A[top:bot, :], it decomposes as such :

  1. A Future is created on processor 1 with argument myid() of worker, the function to be called is getindex implemented in SparseArrays, the args are given as copies of top and bot, i.e brute values 15 and 16 for example
  2. This future is given as an argument to function fetch on myid() worker

Even more confusing to me : if I just type “A”, I still get the whole 16*16 matrix. However if I execute a second time the @everywhere begin … end block of instructions, now I get bound errors, as A was modified on proc 1 to be affected to a slice! But this block of instructions specifically excluded proc 1 from any instructions!

Last question : I’m confused by the philosophy of having one process being the master, one other process (the second one) being a worker, but the both of them really being one physical core. To my understanding, the 8 cores computing things are the processes 2 to 9. Am I doing this right?

Thank you in advance
Best regards

Hmm, I don’t fully understand myself what’s going on here, but you should replace the @fetchfrom construction with interpolation: A = $A[top:bot, :]. See the docstring for @everywhere.

Thank you so much for your answer, it worked. If I may ask one more question : if I now broadcast a vector x to all processes and write

[ @fetchfrom i+1 A*x for i = 1:8 ]

Does this compute the slices of vector Ax = A * x in a strictly parallel fashion, without waiting sequentially? In my experiments it computes the slices of Ax all right, but with big matrices I still get a faster computation time for the A*x product on one core than that of the slices products.

No, that won’t work for several reasons.

  1. fetch is blocking, i.e. the main process waits after every iteration.
  2. It’s not the right A. @fetchfrom (like @spawnat) wraps the expression in a closure and passes that to the worker process, thereby capturing the A from the main process, i.e. were the closure was created. Check the result. The products don’t have the right dimension.

To make it work, you need to give the arrays on the workers a different name like _A. This works

julia> @everywhere workers() begin
         i = myid() -1
         top = 2*(i-1) + 1
         bot = 2*i 
         _A = $A[top:bot, :] 
       end

julia> @everywhere workers() _x = ones(size(A,2))

julia> fetch.([ @spawnat pid _A*x for pid in workers() ])
4-element Vector{Vector{Float64}}:
 [-0.2858042416522781, 1.0291325988689939]
 [0.4766186447550887, 0.6306235841314707]
 [-1.0111982506103134, 2.6748057387543662]
 [-0.7230155954420037, -0.9950903821005783]

That said, the overhead will probably be immense compared to the actual work and thus you will likely see a massive slowdown.

1 Like

Thanks again for your reply, I think I understood everything!