I’m moving my codes from my own computer to a multi-node cluster and realized SharedArrays won’t work on multiple nodes without shared memory.
The old code using SharedArrays looks like below:
using Distributed, ClusterManagers, SharedArrays
Nworker = 100
addprocs(SlurmManager(Nworker), t="20:00:00")
@everywhere Data = CSV.read("Data.csv"; header=false)
@everywhere N = 1_000_000
Result = SharedArray{Float64}(N, 4)
@sync @distributed for i = 1:N
Result[i,:] = ones(4,1)*Data[i] # for illustration only
# however, each iteration will be short,
# so -pmap- may not be good option here.
end
My questions is what will be good alternatives to SharedArrays in this case? If possible, I’d like to keep the @distributed for loop structure to minimize the work of code-rewriting.
I saw DistributedArrays but really didn’t figure out how to use it in my setting. Specifically, how to combine it with @distributed for loop since I don’t control for which part of the iteration goes to which processor. Any suggestions will be greatly appreciated!
You may use the DistributedArray constructor to get around these issues, it’s not very much more complicated.
julia> nworkers()
3
julia> @everywhere using DistributedArrays, OffsetArrays
julia> N = 6 # for demonstration
6
julia> Data = distribute(rand(N)); # using the default distribution at this point
julia> Result = DArray((N,4)) do inds # local indices on each processor
arr = zeros(inds) # We create an OffsetArray with the correct local indices
data_local = OffsetArray(localpart(Data),localindices(Data)) # get the local part of data on each processor and assign the proper indices
for i in inds[1]
arr[i,:] .= ones(4)*data_local[i]
end
parent(arr) # strip the OffsetArray wrapper
end
6×4 DArray{Float64,2,Array{Float64,2}}:
0.54454 0.54454 0.54454 0.54454
0.285744 0.285744 0.285744 0.285744
0.678867 0.678867 0.678867 0.678867
0.62554 0.62554 0.62554 0.62554
0.862751 0.862751 0.862751 0.862751
0.103856 0.103856 0.103856 0.103856
That is expected though because data needs to be transferred to the remote worker nodes for processing, whereas SharedArray is shared by all worker nodes that reside in the same node.
Despite of the reason you mentioned, I think still the performance is nowhere closer to, say, MPI implementation. I don’t know the details behind though.
If by MPI you mean repeated data transfer between cores then yes, Julia does have a performance bottleneck in this, as the arrays are not pre-allocated unlike MPI. This case is different as there’s not much transfer involved. I believe the data-transfer lags might be mitigated to some extent with ArrayChannels.jl, but I haven’t used it myself.
I Admit my ignorance here… how are the arrays copied between the severs?
I know - read the source!
I would start to use the term RDMA here - which woudl put me int he firing line for investigating if this is feasible for DistributedArrays so I will not!