MPI collectives (e.g., MPI.Allgatherv!) scaling issues

I found that MPI.Allgatherv! doesn’t scale well when testing it on a cluster.

The plot above summarizes the execution time of MPI.Allgatherv! against the number of processes. The scaling shows in the plot doesn’t match the theoretical one (C/sqrt(p)). Any thoughts? Are there anything wrong in my experiment? The settings of the experiment are as follows.

Assume there is a 2D process grid of p (=comm_size in the code) processes. Each row/column communicator has sqrt(p) processes. Assume the global matrix is a k-by-N dense matrix, where k << N, I divided the columns of the global matrix evenly into p partitions such that the total number of column vectors stored in each column communicator (consisting of sort(p) processes) stay the (basically) same. Then I used MPI.Allgatherv! to gather at each process Nk/p words from all sqrt(p) processes in each column communicator, which theoretically takes O(α logp + βNk/sqrt(p)) time, where α is the message startup time and β is the per-word transfer time.

For example, assume p = 4, then we have a 2-by-2 2D process grid where each column communicator has 2 processes. Assume N = 50, then, in column major order, each process owns 13, 13, 12, 12 column vectors, respectively. Consequently, the first and second column communicators have totally 26 and 24 column vectors, respectively (26 ~= 24). MPI.Allgatherv! is applied to each column communicator.

For details, please refer to the following codes.

#filename test_collectives.jl
using MPI, LinearAlgebra, Printf

function split_count(N::Integer, n::Integer)
    q,r = divrem(N, n)
    return [i <= r ? q+1 : q for i = 1:n]
end

function split_count_local(N::Integer, n::Integer)
    counts = zeros(Int64, n*n)
    counts1 = split_count(N, n)
    S = zeros(Int64, 2, n*n)
    for i in 1:n # cols
        counts2 = split_count(counts1[i], n)
        counts[(i-1)*n+1:i*n] = counts2
        for j in 1:n # rows
            S[1, (i-1)*n+j] = counts1[j]
            S[2, (i-1)*n+j] = counts1[i]
        end
    end
    return counts, S
end


for code in 1:1
    
    # global dense matrix of k rows and N columns.
    N = parse(Int64, ARGS[1])
    k = parse(Int64, ARGS[2])

    repeats1 = 5


    MPI.Init()
    
    # construct the comm communicator consisting all processes (#processes = comm_size)
    comm = MPI.COMM_WORLD
    rank = MPI.Comm_rank(comm)
    comm_size = MPI.Comm_size(comm)
    comm_size_sq = trunc(Int64, sqrt(comm_size))

    root = 0
    
    # arrange all processes as a 2D grid
    # split all processes into multiple column communicators
    comm_col = MPI.Comm_split(comm, trunc(Int64, rank/comm_size_sq), rank)
    rank_col = MPI.Comm_rank(comm_col)
    # split all processes into multiple row communicators
    comm_row = MPI.Comm_split(comm, mod(rank, comm_size_sq), rank)
    rank_row = MPI.Comm_rank(comm_row)
    
    # col_division divides (as even as possible) the columns of the global k-by-N dense matrix 
    # into comm_size partitions, e.g., col_division[rank+1] indicates the number 
    # of column vectors owned by the rank-th process
    # for example, if N = 50, comm_size = 4, col_division divides all 50 columns 
    # into 13, 13, 12, 12 columns, respectively.
    col_division, _ = split_count_local(N, comm_size_sq) 

    if rank == 0
        @printf("\n-------------test Collectives--------------\n")
        @printf("#processes: %i N: %i k: %i \n", comm_size, N, k)
    end

    MPI.Barrier(comm)
    
    # allocate the local part X at each process, then gather all X's into 
    # X_gather for each column communicator comm_col.
    # for example, if N = 50, comm_size = 4, there are two column communicators 
    # each with X_gather of sizes k-by-26 and k-by-24, respectively. 
    X = randn(k, col_division[rank+1])
    local_info_cols = col_division[rank_row*comm_size_sq+1:rank_row*comm_size_sq+comm_size_sq]
    X_gather = Array{Float64}(undef, (k, sum(local_info_cols)))
    _counts = vcat([k for i in 1:length(local_info_cols[:])]', local_info_cols')
    X_gather_vbuf = VBuffer(X_gather, vec(prod(_counts, dims=1)))

    cputime_allgatherv = 0.0
    for t in 1:repeats1
        MPI.Barrier(comm)
        cputime_allgatherv += @elapsed begin
            MPI.Allgatherv!(X, X_gather_vbuf, comm_col)
        end
        MPI.Barrier(comm)
    end
    cputime_allgatherv /= repeats1
    if rank == 0
        @printf("walltime MPI.Allgatherv!: %.2e \n", cputime_allgatherv)
    end
    

    GC.gc()
    MPI.Finalize()
end

I ran the code in a large cluster where each node gets 128 cpu cores/processes. The MPI library I used is OpenMPI. The Julia version is 1.7. I set N = 5000000 and k = 16. The number of processes are 4 (1 node), 16 (1 node), 64 (1 node), 256 (2 nodes), 1024 (8 nodes), 4096 (32 nodes). Before running the code using a specific number of processes, I requested the corresponding resources (number of nodes) from the cluster using SLURM.

The commands I used are like:

julia -e 'using MPIPreferences; MPIPreferences.use_jll_binary("OpenMPI_jll");'

path_to_mpiexec -n 4 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1  path_to_thefile/test_collectives.jl 5000000 16

path_to_mpiexec -n 16 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1  path_to_thefile/test_collectives.jl 5000000 16

path_to_mpiexec -n 64 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1  path_to_thefile/test_collectives.jl 5000000 16

path_to_mpiexec -n 256 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1  path_to_thefile/test_collectives.jl 5000000 16

path_to_mpiexec -n 1024 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1  path_to_thefile/test_collectives.jl 5000000 16

path_to_mpiexec -n 4096 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1  path_to_thefile/test_collectives.jl 5000000 16

The results (execution time in seconds) I got:

-------------test Collectives--------------
#processes: 4 N: 5000000 k: 16 
walltime MPI.Allgatherv!: 1.42e-01 

-------------test Collectives--------------
#processes: 16 N: 5000000 k: 16 
walltime MPI.Allgatherv!: 2.75e-01 

-------------test Collectives--------------
#processes: 64 N: 5000000 k: 16 
walltime MPI.Allgatherv!: 2.02e-01 

-------------test Collectives--------------
#processes: 256 N: 5000000 k: 16 
walltime MPI.Allgatherv!: 1.45e-01 

-------------test Collectives--------------
#processes: 1024 N: 5000000 k: 16 
walltime MPI.Allgatherv!: 7.56e-02 

-------------test Collectives--------------
#processes: 4096 N: 5000000 k: 16 
walltime MPI.Allgatherv!: 2.06e-01 

For the scaling of the execution time, please refer to the plot at the very beginning.

The theoretical scaling depends on the actual network topology of the cluster. Your plots are likely showing that as you use more nodes, you have a physically longer path to communicate your data, and each node is not directly connected to all other nodes.

You will likely get a better answer if you ask your cluster administrator your questions about scaling, as they will know the network topology you are working with.