# Why passing a distributed matrix to functions hurts performance?

Hi folks,

I observed something strange recently. Passing a distributed matrix to a function will hurt its performance?

Let’s say here is task to apply a distributed matrix to some vectors like below:

``````using Distributed
using DistributedArrays
using LinearAlgebra
``````

% distributed matvec multiplication
% Assume L is a sparse matrix of size N and some number of processors have been added using addprocs, %then DL would be a distributed version of L.

``````
DL = distribute(L, procs=workers(), dist=[length(workers()), 1])
V = rand(N, 5)
``````

% Let’s call the task below “matvec”
% Y is a distributed matrix to store resulting matrix

``````walltime_matvec = @elapsed begin
Y = dzeros((N,5), workers(), [length(workers()), 1])
wkers = workers()
@sync for i in 1:length(workers())
@spawnat wkers[i] localpart(Y)[:,:] = localpart(DL)*V
end
Y = Matrix{Float64}(Y)
end
``````

% Let’s wrap the “matvec” task up as a function and pass DL and V as parameters, as following.
% And we call this task #“matvec_func”.

``````function matvec_func(A, V)
Y = dzeros((N,5), workers(), [length(workers()), 1])
wkers = workers()
@sync for i in 1:length(workers())
@spawnat wkers[i] localpart(Y)[:,:] = localpart(A)*V
end
Y = Matrix{Float64}(Y)
return Y
end
walltime_matvec_func = @elapsed matvec_func(DL, V)
``````

I found time_matvec_func is usually much larger than time_matvec (See the figure below). It looks so strange. Could anyone explain this? Or any suggestion? The figure above summarizes the wall time of each task I mentioned against the number of processors I used. In the figure, the matrix size is 10^7 * 10^7. This experiment ran on a workstation with 2 NUMA nodes. Each node contains 2 sockets and each socket contains 24 CPUs. Please only read the “matvec” and “matvec_func” curves because others curves are for other tasks.

For distributed matmul, you should set the number of BLAS threads per core to 1 with `BLAS.set_num_threads(1)`, otherwise you’ll oversubscribe.

Potentially related (although specialized for gemm rather than gemv): GitHub - haampie/COSMA.jl: Wrapper for COSMA: Distributed Communication-Optimal Matrix-Matrix Multiplication Algorithm

But both Threads.nthreads() and LinearAlgebra.BLAS.get_num_threads() are all equal to 1 in my case. I think they are 1 by default.

They definitely shouldn’t be 1 by default. Which Julia version are you using and how many CPU cores does your system have? Can you check that you haven’t set `OPENBLAS_NUM_THREADS` or `OMP_NUM_THREADS` explicitly?

Example: On a system with 64 physical cores (128 virtual cores due to hyperthreads) I get 8 BLAS threads with Julia v1.7.2 and 128 BLAS threads with Julia v1.8.0-beta3.

1 Like

On a more general note, making a distributed computation efficient is a non-trivial task and typically requires a lot of care and fine tuning. So, personally, I wouldn’t expect efficient out of the box solutions.

Also, let me note that this line is definitely inefficient because it allocates too much

``````localpart(Y)[:,:] = localpart(A)*V
``````

Consider dropping the `[:,:]` and using `mul!` for in-place multiplication. But, that likely won’t fix the scaling.

FWIW, using the following script I get faster performance for a distributed mat-vec multiplication (compared to a naive one) for large enough matrices on a 64 CPU-core machine.

``````using Distributed
using DistributedArrays
using LinearAlgebra
using SparseArrays
using Test

if length(ARGS) != 3
println("Please provide N, p, and Nworkers as arguments.")
exit()
end

const N = parse(Int, ARGS)
const p = parse(Float64, ARGS)
const Nworkers = parse(Int, ARGS)
@show N
@show p
@show Nworkers
L = sprand(N, N, p)
V = rand(N, 5)
R = L * V # desired result

println("Starting worker processes")
withenv("JULIA_PROJECT" => pwd(), "OPENBLAS_NUM_THREADS" => 1) do
end

@everywhere begin
# @show Base.active_project()
using DistributedArrays
using SparseArrays
using LinearAlgebra
end

DL = distribute(L, dist=[Nworkers, 1])

println("Local parts that each worker holds:")
for w in workers()
display(@fetchfrom w DistributedArrays.localindices(DL))
end

function matvec_func(A, V)
Y = dzeros((N, 5), workers(), [Nworkers, 1])
wkers = workers()
@sync for i in 1:Nworkers
@spawnat wkers[i] mul!(localpart(Y), localpart(A), V)
end
Y = Matrix{Float64}(Y)
return Y
end

function matvec_naive(A, V)
Y = zeros(N, 5)
mul!(Y, A, V)
return Y
end

# DR = matvec_func(DL, V)
# @test DR ≈ R
# NR = matvec_naive(L, V)
# @test NR ≈ R

println("Running benchmark")
using BenchmarkTools
t_dist = @belapsed matvec_func(\$DL, \$V);
t_naive = @belapsed matvec_naive(\$L, \$V);

println("\nResults:")
@show t_dist
@show t_naive
@show t_dist - t_naive
``````

Output:

``````➜  bauerc@n2login3 distributed-sparse-matvec  julia. code.jl 1000000 1e-3 64N = 1000000p = 0.001
Nworkers = 64
Starting worker processes
Local parts that each worker holds:
(1:15625, 1:1000000)(15626:31250, 1:1000000)(31251:46875, 1:1000000)(46876:62500, 1:1000000)(62501:78125, 1:1000000)(78126:93750, 1:1000000)(93751:109375, 1:1000000)(109376:125000, 1:1000000)(125001:140625, 1:1000000)(140626:156250, 1:1000000)(156251:171875, 1:1000000)(171876:187500, 1:1000000)(187501:203125, 1:1000000)(203126:218750, 1:1000000)(218751:234375, 1:1000000)(234376:250000, 1:1000000)(250001:265625, 1:1000000)(265626:281250, 1:1000000)(281251:296875, 1:1000000)(296876:312500, 1:1000000)(312501:328125, 1:1000000)(328126:343750, 1:1000000)(343751:359375, 1:1000000)(359376:375000, 1:1000000)(375001:390625, 1:1000000)(390626:406250, 1:1000000)(406251:421875, 1:1000000)(421876:437500, 1:1000000)(437501:453125, 1:1000000)(453126:468750, 1:1000000)(468751:484375, 1:1000000)(484376:500000, 1:1000000)(500001:515625, 1:1000000)(515626:531250, 1:1000000)(531251:546875, 1:1000000)(546876:562500, 1:1000000)(562501:578125, 1:1000000)(578126:593750, 1:1000000)(593751:609375, 1:1000000)(609376:625000, 1:1000000)(625001:640625, 1:1000000)(640626:656250, 1:1000000)(656251:671875, 1:1000000)(671876:687500, 1:1000000)(687501:703125, 1:1000000)(703126:718750, 1:1000000)(718751:734375, 1:1000000)(734376:750000, 1:1000000)(750001:765625, 1:1000000)(765626:781250, 1:1000000)(781251:796875, 1:1000000)(796876:812500, 1:1000000)(812501:828125, 1:1000000)(828126:843750, 1:1000000)(843751:859375, 1:1000000)(859376:875000, 1:1000000)(875001:890625, 1:1000000)(890626:906250, 1:1000000)(906251:921875, 1:1000000)(921876:937500, 1:1000000)(937501:953125, 1:1000000)(953126:968750, 1:1000000)(968751:984375, 1:1000000)(984376:1000000, 1:1000000)Running benchmark

Results:
t_dist = 1.075116944
t_naive = 10.263158796
t_dist - t_naive = -9.188041852000001
``````
2 Likes

OPENBLAS_NUM_THREADS was 1 when I did the experiments above. Maybe I set it to 1 before but I forgot. I did try mul! instead of [:,:] and *, but this does not fix the scaling. For the example you show here, yes you get faster performance there. I can also get much faster performance using the implementation I mentioned. However, I think none of them has the correct scaling. I agree with what you said – making a distributed computation efficient is a non-trivial task.

Maybe PartitionedArrays.jl could help implement an efficient matrix-vector (matrix-matrix) multiplication, because the scaling shown there looks good. See PartitionedArrays.jl in github.