I’m working on parallelizing some graph algorithms, and I’m trying to use the DistributedArrays package to create an array that allows a worker to operate on a single vector without having to re-generate the array within the function at each iteration.
It works, but the @time is showing allocations that are much larger for the distributed implementation than the serial implementation. Does anyone know why? What am I doing to structure my code incorrectly/use DistributedArrays incorrectly?
I’ve tried to generate an illustrative example below.
Note: Under this illustrative example, the distributed implementation is slower, but under the real implementation, it’s much faster given the size of the graph etc, so I’m not concerned about any speed differential in the example shown herein.
##  no problems here
using Distributed
using DistributedArrays
if nprocs() == 1
    addprocs(10)
end
@everywhere using Random
##  DISTRIBUTED APPROACH
# Some algorithm, would be a graph distance algorithm; distance algorithm uses nested loops
@everywhere function algorithm!(
    vec::Union{Vector{Int64}, Matrix{Int64}},
)
    n = length(vec)
    for i in 1:n
        r = Random.rand(1:n)
        vec[i] = (i < r) ? r : 0
    end    
end
##  AGGREGATION FUNCTIONS
# try to use DArray to store values efficiently
function sum_distribute(
    n_sum::Int64,
    d_array::DArray,
)
    out = @distributed (+) for i in 1:n_sum
        algorithm!(d_array[:L])
        sum(d_array[:L])
    end
    
    return out
end
# run serially with a pre-allocated vector
function sum_serial_allocate(
    n_sum::Int64,
    vec::Vector{Int64},
)   
    out = 0
    
    for i in 1:n_sum
        algorithm!(vec)
        out += sum(vec)
    end
    
    return out
end
# Generate DArray
function spawn_darray(n::Int64)
    nw = nworkers()
    D = dzeros(Int64, (n, nw), workers(), [1, nw])
    return D
end
##  MIMIC RUNNING ALGORITHM MANY TIMES
function do_many(iter::Int64, d_array::DArray)
    for i in 1:iter
        @time sum_distribute(iter, d_array)
    end
end
function do_many(iter::Int64, vec::Vector{Int64})
    for i in 1:iter
        @time sum_serial_allocate(iter, vec)
    end
end
##  MAIN FUNCS - added some printing
function main_distribute(n::Int64)
    println("darray allocation:")
    @time darray = spawn_darray(n)
    do_many(10, darray)
    DistributedArrays.close(darray)
end
function main_serial(n::Int64)
    println("vec allocation:")
    @time vec = zeros(Int64, n)
    do_many(10, vec)
end
Running the main() functions give these output:
> main_distribute(10)
darray allocation:
  0.002635 seconds (882 allocations: 47.453 KiB)
  0.002311 seconds (722 allocations: 34.156 KiB)
  0.003628 seconds (720 allocations: 34.125 KiB)
  0.001688 seconds (722 allocations: 35.750 KiB)
  0.001858 seconds (722 allocations: 34.156 KiB)
  0.001760 seconds (727 allocations: 36.469 KiB)
  0.002384 seconds (720 allocations: 34.125 KiB)
  0.002504 seconds (722 allocations: 34.156 KiB)
  0.001418 seconds (722 allocations: 35.750 KiB)
  0.001946 seconds (720 allocations: 34.125 KiB)
  0.001806 seconds (727 allocations: 36.469 KiB)
  0.026818 seconds (8.57 k allocations: 421.266 KiB)
> main_serial(10)
vec allocation:
  0.000009 seconds (1 allocation: 144 bytes)
  0.000002 seconds
  0.000001 seconds
  0.000001 seconds
  0.000001 seconds
  0.000001 seconds
  0.000001 seconds
  0.000001 seconds
  0.000002 seconds
  0.000001 seconds
  0.000001 seconds
  0.000915 seconds (475 allocations: 29.039 KiB)
Why does the main_distributed() function keep allocating memory at each iteration, leading to almost 15x as much memory allocated?