I found that MPI.Allgatherv! doesn’t scale well when testing it on a cluster.
The plot above summarizes the execution time of MPI.Allgatherv! against the number of processes. The scaling shows in the plot doesn’t match the theoretical one (C/sqrt(p)). Any thoughts? Are there anything wrong in my experiment? The settings of the experiment are as follows.
Assume there is a 2D process grid of p (=comm_size in the code) processes. Each row/column communicator has sqrt(p) processes. Assume the global matrix is a k-by-N dense matrix, where k << N, I divided the columns of the global matrix evenly into p partitions such that the total number of column vectors stored in each column communicator (consisting of sort(p) processes) stay the (basically) same. Then I used MPI.Allgatherv! to gather at each process Nk/p words from all sqrt(p) processes in each column communicator, which theoretically takes O(α logp + βNk/sqrt(p)) time, where α is the message startup time and β is the per-word transfer time.
For example, assume p = 4, then we have a 2-by-2 2D process grid where each column communicator has 2 processes. Assume N = 50, then, in column major order, each process owns 13, 13, 12, 12 column vectors, respectively. Consequently, the first and second column communicators have totally 26 and 24 column vectors, respectively (26 ~= 24). MPI.Allgatherv! is applied to each column communicator.
For details, please refer to the following codes.
#filename test_collectives.jl
using MPI, LinearAlgebra, Printf
function split_count(N::Integer, n::Integer)
q,r = divrem(N, n)
return [i <= r ? q+1 : q for i = 1:n]
end
function split_count_local(N::Integer, n::Integer)
counts = zeros(Int64, n*n)
counts1 = split_count(N, n)
S = zeros(Int64, 2, n*n)
for i in 1:n # cols
counts2 = split_count(counts1[i], n)
counts[(i-1)*n+1:i*n] = counts2
for j in 1:n # rows
S[1, (i-1)*n+j] = counts1[j]
S[2, (i-1)*n+j] = counts1[i]
end
end
return counts, S
end
for code in 1:1
# global dense matrix of k rows and N columns.
N = parse(Int64, ARGS[1])
k = parse(Int64, ARGS[2])
repeats1 = 5
MPI.Init()
# construct the comm communicator consisting all processes (#processes = comm_size)
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
comm_size = MPI.Comm_size(comm)
comm_size_sq = trunc(Int64, sqrt(comm_size))
root = 0
# arrange all processes as a 2D grid
# split all processes into multiple column communicators
comm_col = MPI.Comm_split(comm, trunc(Int64, rank/comm_size_sq), rank)
rank_col = MPI.Comm_rank(comm_col)
# split all processes into multiple row communicators
comm_row = MPI.Comm_split(comm, mod(rank, comm_size_sq), rank)
rank_row = MPI.Comm_rank(comm_row)
# col_division divides (as even as possible) the columns of the global k-by-N dense matrix
# into comm_size partitions, e.g., col_division[rank+1] indicates the number
# of column vectors owned by the rank-th process
# for example, if N = 50, comm_size = 4, col_division divides all 50 columns
# into 13, 13, 12, 12 columns, respectively.
col_division, _ = split_count_local(N, comm_size_sq)
if rank == 0
@printf("\n-------------test Collectives--------------\n")
@printf("#processes: %i N: %i k: %i \n", comm_size, N, k)
end
MPI.Barrier(comm)
# allocate the local part X at each process, then gather all X's into
# X_gather for each column communicator comm_col.
# for example, if N = 50, comm_size = 4, there are two column communicators
# each with X_gather of sizes k-by-26 and k-by-24, respectively.
X = randn(k, col_division[rank+1])
local_info_cols = col_division[rank_row*comm_size_sq+1:rank_row*comm_size_sq+comm_size_sq]
X_gather = Array{Float64}(undef, (k, sum(local_info_cols)))
_counts = vcat([k for i in 1:length(local_info_cols[:])]', local_info_cols')
X_gather_vbuf = VBuffer(X_gather, vec(prod(_counts, dims=1)))
cputime_allgatherv = 0.0
for t in 1:repeats1
MPI.Barrier(comm)
cputime_allgatherv += @elapsed begin
MPI.Allgatherv!(X, X_gather_vbuf, comm_col)
end
MPI.Barrier(comm)
end
cputime_allgatherv /= repeats1
if rank == 0
@printf("walltime MPI.Allgatherv!: %.2e \n", cputime_allgatherv)
end
GC.gc()
MPI.Finalize()
end
I ran the code in a large cluster where each node gets 128 cpu cores/processes. The MPI library I used is OpenMPI. The Julia version is 1.7. I set N = 5000000 and k = 16. The number of processes are 4 (1 node), 16 (1 node), 64 (1 node), 256 (2 nodes), 1024 (8 nodes), 4096 (32 nodes). Before running the code using a specific number of processes, I requested the corresponding resources (number of nodes) from the cluster using SLURM.
The commands I used are like:
julia -e 'using MPIPreferences; MPIPreferences.use_jll_binary("OpenMPI_jll");'
path_to_mpiexec -n 4 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1 path_to_thefile/test_collectives.jl 5000000 16
path_to_mpiexec -n 16 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1 path_to_thefile/test_collectives.jl 5000000 16
path_to_mpiexec -n 64 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1 path_to_thefile/test_collectives.jl 5000000 16
path_to_mpiexec -n 256 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1 path_to_thefile/test_collectives.jl 5000000 16
path_to_mpiexec -n 1024 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1 path_to_thefile/test_collectives.jl 5000000 16
path_to_mpiexec -n 4096 /export/pkgs/linux-u22/julia-1.7.3/bin/julia -Cnative -J/export/pkgs/linux-u22/julia-1.7.3/lib/julia/sys.so -g1 path_to_thefile/test_collectives.jl 5000000 16
The results (execution time in seconds) I got:
-------------test Collectives--------------
#processes: 4 N: 5000000 k: 16
walltime MPI.Allgatherv!: 1.42e-01
-------------test Collectives--------------
#processes: 16 N: 5000000 k: 16
walltime MPI.Allgatherv!: 2.75e-01
-------------test Collectives--------------
#processes: 64 N: 5000000 k: 16
walltime MPI.Allgatherv!: 2.02e-01
-------------test Collectives--------------
#processes: 256 N: 5000000 k: 16
walltime MPI.Allgatherv!: 1.45e-01
-------------test Collectives--------------
#processes: 1024 N: 5000000 k: 16
walltime MPI.Allgatherv!: 7.56e-02
-------------test Collectives--------------
#processes: 4096 N: 5000000 k: 16
walltime MPI.Allgatherv!: 2.06e-01
For the scaling of the execution time, please refer to the plot at the very beginning.