Adding vs multiplying matrices with DistributedArrays

Hello everyone,

I have a question regarding usage of distributed arrays. In preparation of a bigger calculation, I am doing a simple toy calculation to understand how distributed arrays works.

Let me add 4 processors to the job. Let me assume I have a 4 x 24 matrix. I want to divide this array into 4 parts: 1 row each and 24 columns distributed across the processors. Later on, I will fill the columns on each matrix by doing some calculations. (Such kind of computations arise in boundary element methods, where each row corresponds to a single evaluation point and the rows are result of discretised integrals.)

This is motivated by the Julia Academy notebooks Distributed Computing | JuliaAcademy taught by Matt Bauman

As a side note, I think I need to first import the package DistributedArrays on the master node and then use @everywhere using Distributed, DistributedArrays

using Distributed, DistributedArrays
@everywhere using Distributed, DistributedArrays
# Add 4 processors only once
if nprocs() == 1
     addprocs(4)
 else
     println("Only add 4 processors")
 end
 println("Total number of processors include master")
display(nprocs()) # Verify the total number of processors
A = DArray(I->fill(myid(), length.(I)), (4, 24), workers()[1:4], [4,1])
display(A)
println("Checking: Array stored in proc 2")
display(fetch(@spawnat 2 A.localpart))
#println("Array stored in proc 3")
#display(fetch(@spawnat 3 A.localpart))
#println("Array stored in proc 4")
#display(fetch(@spawnat 4 A.localpart))
#println("Checking: Array stored in proc 5")
#display(fetch(@spawnat 5 A.localpart))

# Do some operations
B = A + A
println("Checking: After addition new array stored in proc 2")
display(fetch(@spawnat 2 B.localpart))

C = A.*A
println("Checking: After multiplication new array stored in proc 2")
display(fetch(@spawnat 2 C.localpart))

The outputs are:

julia> include("darraytest.jl")
Only add 4 processors
Total number of processors include master
5
4×24 DArray{Int64, 2, Matrix{Int64}}:
 2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2
 3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3  3
 4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4
 5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5  5
Checking: Array stored in proc 2
1×24 Matrix{Int64}:
 2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2
Checking: After addition new array stored in proc 2
1×24 Matrix{Int64}:
 4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4  4
Checking: After multiplication new array stored in proc 2
4×6 Matrix{Int64}:
  4   4   4   4   4   4
  9   9   9   9   9   9
 16  16  16  16  16  16
 25  25  25  25  25  25

As you can see, the output of B = A + A is okay and what I expected. The output of C is okay, but the processors don’t own 1 x 24 arrays rather they own 4 x 6 matrix. This is unexpected and undesirable for the kind of calculation I have in mind. I wanted each array of C to be stored in each processor just like A and B. I wanted the output of C = A.*A to be stored in processor 2 as 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4. What am I not understanding here ? Thank you for any help or suggestion.

1 Like

It seems that the output of broadcast() does not inherit the distribution of the input DArrays. Maybe the easiest way around this is to preallocate the output:

julia> A = DArray(I->fill(myid(), length.(I)), (4,5), workers(), [4,1])
4×5 DArray{Int64, 2, Matrix{Int64}}:
 2  2  2  2  2
 3  3  3  3  3
 4  4  4  4  4
 5  5  5  5  5

julia> B = DArray(I->fill(myid(), length.(I)), (4,5), workers(), [4,1])
4×5 DArray{Int64, 2, Matrix{Int64}}:
 2  2  2  2  2
 3  3  3  3  3
 4  4  4  4  4
 5  5  5  5  5

julia> B .= A.+A
4×5 DArray{Int64, 2, Matrix{Int64}}:
  4   4   4   4   4
  6   6   6   6   6
  8   8   8   8   8
 10  10  10  10  10

julia> @fetchfrom 2 localpart(B)
1×5 Matrix{Int64}:
 4  4  4  4  4

Thanks for the reply.

However, as with my example and yours too, adding two matrices seems to work fine. It is the multiplication that doesn’t work, even when allocating it beforehand.

The key are the dots. The operation (+ or *) doesn’t matter. For example, even addition leads to the wrong distribution if I replace + with .+:

julia> C = A .+ A
       @fetchfrom 2 localpart(C)
2×3 Matrix{Int64}:
 4  4  4
 6  6  6

On the other hand, multiplication works fine if I preallocate and add all the dots:

julia> B .= A.*A
       @fetchfrom 2 localpart(B)
1×5 Matrix{Int64}:
 4  4  4  4  4
2 Likes

Perfect!! Thank you very much.

1 Like

To conclude, one needs dots, and also initialize the matrices where the results are stored.

For example, without dots at the equal sign ( B .=),

using Distributed, DistributedArrays

# Add 4 processors only once
if nprocs() == 1
     addprocs(4)
else
     println("Only add 4 processors")
end

@everywhere using Distributed, DistributedArrays

println("Total number of processors including master")
display(nprocs()) # Verify the total number of processors
A = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
println("---------------A-------------------")
display(A)
println("Checking: Array stored in proc 2")
display(fetch(@spawnat 2 A.localpart))
println("----------------------------------")
#println("Array stored in proc 3")
#display(fetch(@spawnat 3 A.localpart))
#println("Array stored in proc 4")
#display(fetch(@spawnat 4 A.localpart))
#println("Checking: Array stored in proc 5")
#display(fetch(@spawnat 5 A.localpart))

# Do some operations
# Addition
# B = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
B = A .+ A
println("---------------B-------------------")
display(B)
println("Checking Matrix B: After addition new array stored in proc 2")
display(fetch(@spawnat 2 B.localpart))
println("----------------------------------")
# Multiplication
#C = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
C = A .* A
println("---------------C-------------------")
display(C)
println("Checking Matrix C: After multiplication new array stored in proc 2")
display(fetch(@spawnat 2 C.localpart))
println("----------------------------------")

I get the result-1 where Arrays A, and B and C are stored differently by different processors. This is not what I want.

julia> include("darraytest.jl")
Only add 4 processors
Total number of processors including master
5
---------------A-------------------
4×4 DArray{Int64, 2, Matrix{Int64}}:
 2  2  2  2
 3  3  3  3
 4  4  4  4
 5  5  5  5
Checking: Array stored in proc 2
1×4 Matrix{Int64}:
 2  2  2  2
----------------------------------
---------------B-------------------
4×4 DArray{Int64, 2, Matrix{Int64}}:
  4   4   4   4
  6   6   6   6
  8   8   8   8
 10  10  10  10
Checking Matrix B: After addition new array stored in proc 2
2×2 Matrix{Int64}:
 4  4
 6  6
----------------------------------
---------------C-------------------
4×4 DArray{Int64, 2, Matrix{Int64}}:
  4   4   4   4
  9   9   9   9
 16  16  16  16
 25  25  25  25
Checking Matrix C: After multiplication new array stored in proc 2
2×2 Matrix{Int64}:
 4  4
 9  9
----------------------------------

With dots, but without initialisation, I get error for multiplying matrices.

using Distributed, DistributedArrays

# Add 4 processors only once
if nprocs() == 1
     addprocs(4)
else
     println("Only add 4 processors")
end

@everywhere using Distributed, DistributedArrays

println("Total number of processors including master")
display(nprocs()) # Verify the total number of processors
A = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
println("---------------A-------------------")
display(A)
println("Checking: Array stored in proc 2")
display(fetch(@spawnat 2 A.localpart))
println("----------------------------------")
#println("Array stored in proc 3")
#display(fetch(@spawnat 3 A.localpart))
#println("Array stored in proc 4")
#display(fetch(@spawnat 4 A.localpart))
#println("Checking: Array stored in proc 5")
#display(fetch(@spawnat 5 A.localpart))

# Do some operations
# Addition
# B = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
B .= A .+ A
println("---------------B-------------------")
display(B)
println("Checking Matrix B: After addition new array stored in proc 2")
display(fetch(@spawnat 2 B.localpart))
println("----------------------------------")
# Multiplication
#C = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
C .= A .* A
println("---------------C-------------------")
display(C)
println("Checking Matrix C: After multiplication new array stored in proc 2")
display(fetch(@spawnat 2 C.localpart))
println("----------------------------------")

I get the result-2 which throws an error

julia> include("darraytest.jl")
Total number of processors including master
5
---------------A-------------------
4×4 DArray{Int64, 2, Matrix{Int64}}:
 2  2  2  2
 3  3  3  3
 4  4  4  4
 5  5  5  5
Checking: Array stored in proc 2
1×4 Matrix{Int64}:
 2  2  2  2
----------------------------------
ERROR: LoadError: UndefVarError: B not defined
Stacktrace:
 [1] top-level scope
   @ ~/Desktop/mydocuments/RESEARCH/juliacodes/gausslegendre/darraytest.jl:30
 [2] include(fname::String)
   @ Base.MainInclude ./client.jl:444
 [3] top-level scope
   @ none:1
in expression starting at /home/debasish/Desktop/mydocuments/RESEARCH/juliacodes/gausslegendre/darraytest.jl:30

Strangely, the above code will work for B = A + A !!

Finally, the last code gives the desired result, where we use dots and also initialise the matrices before hand.

using Distributed, DistributedArrays

# Add 4 processors only once
if nprocs() == 1
     addprocs(4)
else
     println("Only add 4 processors")
end

@everywhere using Distributed, DistributedArrays

println("Total number of processors including master")
display(nprocs()) # Verify the total number of processors
A = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
println("---------------A-------------------")
display(A)
println("Checking: Array stored in proc 2")
display(fetch(@spawnat 2 A.localpart))
println("----------------------------------")
#println("Array stored in proc 3")
#display(fetch(@spawnat 3 A.localpart))
#println("Array stored in proc 4")
#display(fetch(@spawnat 4 A.localpart))
#println("Checking: Array stored in proc 5")
#display(fetch(@spawnat 5 A.localpart))

# Do some operations
# Addition
B = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
B .= A .+ A
println("---------------B-------------------")
display(B)
println("Checking Matrix B: After addition new array stored in proc 2")
display(fetch(@spawnat 2 B.localpart))
println("----------------------------------")
# Multiplication
C = DArray(I->fill(myid(), length.(I)), (4, 4), workers()[1:4], [4,1])
C .= A .* A
println("---------------C-------------------")
display(C)
println("Checking Matrix C: After multiplication new array stored in proc 2")
display(fetch(@spawnat 2 C.localpart))
println("----------------------------------")

The result-3 is

julia> include("darraytest.jl")
Total number of processors including master
5
---------------A-------------------
4×4 DArray{Int64, 2, Matrix{Int64}}:
 2  2  2  2
 3  3  3  3
 4  4  4  4
 5  5  5  5
Checking: Array stored in proc 2
1×4 Matrix{Int64}:
 2  2  2  2
----------------------------------
---------------B-------------------
4×4 DArray{Int64, 2, Matrix{Int64}}:
  4   4   4   4
  6   6   6   6
  8   8   8   8
 10  10  10  10
Checking Matrix B: After addition new array stored in proc 2
1×4 Matrix{Int64}:
 4  4  4  4
----------------------------------
---------------C-------------------
4×4 DArray{Int64, 2, Matrix{Int64}}:
  4   4   4   4
  9   9   9   9
 16  16  16  16
 25  25  25  25
Checking Matrix C: After multiplication new array stored in proc 2
1×4 Matrix{Int64}:
 4  4  4  4
----------------------------------

This is the desired result. Hence, I think if one does element-by-element operation, for e.g. in a for-loop, things should be okay. But for broadcasting operations, one needs to be careful.

If this confuses you, then I recommend you have another look at Functions · The Julia Language.

The key point is that .= is very different from =, despite the syntactic similarity:

A .= B is roughly the same as

for i in eachindex(A)
    A[i] = B[i]
end

Hence you get an error if A doesn’t exist or is not of the same size as B.

On the other hand, A = B is just ordinary variable assignment. So B = A + A first calls the function +(A,A), and then it makes B refer to whatever this function returns. In this case, it doesn’t matter whether B has been introduced before because in either case B will just point to +(A,A) from now on.

2 Likes

Thank you Simon. This is very helpful.