Alternative to SharedArrays for multi-node cluster?

Hello!

I’m moving my codes from my own computer to a multi-node cluster and realized SharedArrays won’t work on multiple nodes without shared memory.

The old code using SharedArrays looks like below:

using Distributed, ClusterManagers, SharedArrays
Nworker = 100
addprocs(SlurmManager(Nworker), t="20:00:00")

@everywhere Data = CSV.read("Data.csv"; header=false)
@everywhere N = 1_000_000

Result = SharedArray{Float64}(N, 4)

@sync @distributed for i = 1:N
    Result[i,:] = ones(4,1)*Data[i]  # for illustration only
    # however, each iteration will be short, 
    # so -pmap- may not be good option here.
end

My questions is what will be good alternatives to SharedArrays in this case? If possible, I’d like to keep the @distributed for loop structure to minimize the work of code-rewriting.

I saw DistributedArrays but really didn’t figure out how to use it in my setting. Specifically, how to combine it with @distributed for loop since I don’t control for which part of the iteration goes to which processor. Any suggestions will be greatly appreciated!

You may use the DistributedArray constructor to get around these issues, it’s not very much more complicated.

julia> nworkers()
3

julia> @everywhere using DistributedArrays, OffsetArrays

julia> N = 6 # for demonstration
6

julia> Data = distribute(rand(N)); # using the default distribution at this point

julia>  Result = DArray((N,4)) do inds # local indices on each processor
           arr = zeros(inds) # We create an OffsetArray with the correct local indices
           data_local = OffsetArray(localpart(Data),localindices(Data)) # get the local part of data on each processor and assign the proper indices
           for i in inds[1]
               arr[i,:] .= ones(4)*data_local[i]
           end
           parent(arr) # strip the OffsetArray wrapper
       end
6×4 DArray{Float64,2,Array{Float64,2}}:
 0.54454   0.54454   0.54454   0.54454 
 0.285744  0.285744  0.285744  0.285744
 0.678867  0.678867  0.678867  0.678867
 0.62554   0.62554   0.62554   0.62554 
 0.862751  0.862751  0.862751  0.862751
 0.103856  0.103856  0.103856  0.103856
2 Likes

In my personal experience, DistributedArrays works, but the performance is much worse than SharedArrays.

Thank you so much! Maybe it’s a dumb question: does this for-loop work in parallel over workers or sequential?

Thanks! I just adapted the codes provided by @jishnub and found basically the same performance. Maybe it differs from case to case.

I actually figured out. It’s sequential for the block within each processor.

Thank you so much for the help. I now adapted it to my codes and it works perfect!

That is expected though because data needs to be transferred to the remote worker nodes for processing, whereas SharedArray is shared by all worker nodes that reside in the same node.

Despite of the reason you mentioned, I think still the performance is nowhere closer to, say, MPI implementation. I don’t know the details behind though.

If by MPI you mean repeated data transfer between cores then yes, Julia does have a performance bottleneck in this, as the arrays are not pre-allocated unlike MPI. This case is different as there’s not much transfer involved. I believe the data-transfer lags might be mitigated to some extent with ArrayChannels.jl, but I haven’t used it myself.

I Admit my ignorance here… how are the arrays copied between the severs?
I know - read the source!
I would start to use the term RDMA here - which woudl put me int he firing line for investigating if this is feasible for DistributedArrays so I will not!