Why passing a distributed matrix to functions hurts performance?

Hi folks,

I observed something strange recently. Passing a distributed matrix to a function will hurt its performance?

Let’s say here is task to apply a distributed matrix to some vectors like below:

using Distributed
using DistributedArrays
using LinearAlgebra

% distributed matvec multiplication
% Assume L is a sparse matrix of size N and some number of processors have been added using addprocs, %then DL would be a distributed version of L.


DL = distribute(L, procs=workers(), dist=[length(workers()), 1])
V = rand(N, 5)

% Let’s call the task below “matvec”
% Y is a distributed matrix to store resulting matrix

walltime_matvec = @elapsed begin
Y = dzeros((N,5), workers(), [length(workers()), 1])
wkers = workers()
@sync for i in 1:length(workers())
        @spawnat wkers[i] localpart(Y)[:,:] = localpart(DL)*V
end
Y = Matrix{Float64}(Y)
end

% Let’s wrap the “matvec” task up as a function and pass DL and V as parameters, as following.
% And we call this task #“matvec_func”.

function matvec_func(A, V)
     Y = dzeros((N,5), workers(), [length(workers()), 1])
     wkers = workers()
    @sync for i in 1:length(workers())
           @spawnat wkers[i] localpart(Y)[:,:] = localpart(A)*V
    end
    Y = Matrix{Float64}(Y)
   return Y
end
walltime_matvec_func = @elapsed matvec_func(DL, V)

I found time_matvec_func is usually much larger than time_matvec (See the figure below). It looks so strange. Could anyone explain this? Or any suggestion?
dc4_p10m

The figure above summarizes the wall time of each task I mentioned against the number of processors I used. In the figure, the matrix size is 10^7 * 10^7. This experiment ran on a workstation with 2 NUMA nodes. Each node contains 2 sockets and each socket contains 24 CPUs. Please only read the “matvec” and “matvec_func” curves because others curves are for other tasks.

For distributed matmul, you should set the number of BLAS threads per core to 1 with BLAS.set_num_threads(1), otherwise you’ll oversubscribe.

Potentially related (although specialized for gemm rather than gemv): GitHub - haampie/COSMA.jl: Wrapper for COSMA: Distributed Communication-Optimal Matrix-Matrix Multiplication Algorithm

But both Threads.nthreads() and LinearAlgebra.BLAS.get_num_threads() are all equal to 1 in my case. I think they are 1 by default.

They definitely shouldn’t be 1 by default. Which Julia version are you using and how many CPU cores does your system have? Can you check that you haven’t set OPENBLAS_NUM_THREADS or OMP_NUM_THREADS explicitly?

Example: On a system with 64 physical cores (128 virtual cores due to hyperthreads) I get 8 BLAS threads with Julia v1.7.2 and 128 BLAS threads with Julia v1.8.0-beta3.

1 Like

On a more general note, making a distributed computation efficient is a non-trivial task and typically requires a lot of care and fine tuning. So, personally, I wouldn’t expect efficient out of the box solutions.

Also, let me note that this line is definitely inefficient because it allocates too much

localpart(Y)[:,:] = localpart(A)*V

Consider dropping the [:,:] and using mul! for in-place multiplication. But, that likely won’t fix the scaling.

FWIW, using the following script I get faster performance for a distributed mat-vec multiplication (compared to a naive one) for large enough matrices on a 64 CPU-core machine.

using Distributed
using DistributedArrays
using LinearAlgebra
using SparseArrays
using Test

BLAS.set_num_threads(1)

if length(ARGS) != 3
    println("Please provide N, p, and Nworkers as arguments.")
    exit()
end

const N = parse(Int, ARGS[1])
const p = parse(Float64, ARGS[2])
const Nworkers = parse(Int, ARGS[3])
@show N
@show p
@show Nworkers
L = sprand(N, N, p)
V = rand(N, 5)
R = L * V # desired result

println("Starting worker processes")
withenv("JULIA_PROJECT" => pwd(), "OPENBLAS_NUM_THREADS" => 1) do
    addprocs(Nworkers)
end

@everywhere begin
    # @show Base.active_project()
    using DistributedArrays
    using SparseArrays
    using LinearAlgebra
end

DL = distribute(L, dist=[Nworkers, 1])

println("Local parts that each worker holds:")
for w in workers()
    display(@fetchfrom w DistributedArrays.localindices(DL))
end

function matvec_func(A, V)
    Y = dzeros((N, 5), workers(), [Nworkers, 1])
    wkers = workers()
    @sync for i in 1:Nworkers
        @spawnat wkers[i] mul!(localpart(Y), localpart(A), V)
    end
    Y = Matrix{Float64}(Y)
    return Y
end

function matvec_naive(A, V)
    Y = zeros(N, 5)
    mul!(Y, A, V)
    return Y
end

# DR = matvec_func(DL, V)
# @test DR ≈ R
# NR = matvec_naive(L, V)
# @test NR ≈ R

println("Running benchmark")
using BenchmarkTools
t_dist = @belapsed matvec_func($DL, $V);
t_naive = @belapsed matvec_naive($L, $V);

println("\nResults:")
@show t_dist
@show t_naive
@show t_dist - t_naive

Output:

➜  bauerc@n2login3 distributed-sparse-matvec  julia. code.jl 1000000 1e-3 64N = 1000000p = 0.001
Nworkers = 64
Starting worker processes
Local parts that each worker holds:
(1:15625, 1:1000000)(15626:31250, 1:1000000)(31251:46875, 1:1000000)(46876:62500, 1:1000000)(62501:78125, 1:1000000)(78126:93750, 1:1000000)(93751:109375, 1:1000000)(109376:125000, 1:1000000)(125001:140625, 1:1000000)(140626:156250, 1:1000000)(156251:171875, 1:1000000)(171876:187500, 1:1000000)(187501:203125, 1:1000000)(203126:218750, 1:1000000)(218751:234375, 1:1000000)(234376:250000, 1:1000000)(250001:265625, 1:1000000)(265626:281250, 1:1000000)(281251:296875, 1:1000000)(296876:312500, 1:1000000)(312501:328125, 1:1000000)(328126:343750, 1:1000000)(343751:359375, 1:1000000)(359376:375000, 1:1000000)(375001:390625, 1:1000000)(390626:406250, 1:1000000)(406251:421875, 1:1000000)(421876:437500, 1:1000000)(437501:453125, 1:1000000)(453126:468750, 1:1000000)(468751:484375, 1:1000000)(484376:500000, 1:1000000)(500001:515625, 1:1000000)(515626:531250, 1:1000000)(531251:546875, 1:1000000)(546876:562500, 1:1000000)(562501:578125, 1:1000000)(578126:593750, 1:1000000)(593751:609375, 1:1000000)(609376:625000, 1:1000000)(625001:640625, 1:1000000)(640626:656250, 1:1000000)(656251:671875, 1:1000000)(671876:687500, 1:1000000)(687501:703125, 1:1000000)(703126:718750, 1:1000000)(718751:734375, 1:1000000)(734376:750000, 1:1000000)(750001:765625, 1:1000000)(765626:781250, 1:1000000)(781251:796875, 1:1000000)(796876:812500, 1:1000000)(812501:828125, 1:1000000)(828126:843750, 1:1000000)(843751:859375, 1:1000000)(859376:875000, 1:1000000)(875001:890625, 1:1000000)(890626:906250, 1:1000000)(906251:921875, 1:1000000)(921876:937500, 1:1000000)(937501:953125, 1:1000000)(953126:968750, 1:1000000)(968751:984375, 1:1000000)(984376:1000000, 1:1000000)Running benchmark

Results:
t_dist = 1.075116944
t_naive = 10.263158796
t_dist - t_naive = -9.188041852000001
2 Likes

OPENBLAS_NUM_THREADS was 1 when I did the experiments above. Maybe I set it to 1 before but I forgot. I did try mul! instead of [:,:] and *, but this does not fix the scaling. For the example you show here, yes you get faster performance there. I can also get much faster performance using the implementation I mentioned. However, I think none of them has the correct scaling. I agree with what you said – making a distributed computation efficient is a non-trivial task.

Maybe PartitionedArrays.jl could help implement an efficient matrix-vector (matrix-matrix) multiplication, because the scaling shown there looks good. See PartitionedArrays.jl in github.