How to bind an existing SharedArray to new worker process?

parallel

#1

Is there any way to do that? I couldn’t find anything in the reference.

At first, I thought it could be automatic… as I user I don’t want to think about it when adding new worker processes. It would be OK, however, if there’s a method that I can call to bind the new process to the data.

Moreover, passing a SharedArray as an argument seems to create unexpected results. I recognize that this is not a good use case as it would defeat the purpose of why I use SharedArray to begin with. I was surprised by the behavior, however. In the example below, first remote call returns NaN and second call returns a weird number… Am I missing something?

julia> A = SharedArray(rand(10,10));

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> sum(A)
46.13418817191202

julia> remotecall_fetch(sum, 2, A)
NaN

julia> remotecall_fetch(sum, 2, A)
9.118663429e-314

#2

In v0.7, I got an error when passing SharedArrays as an argument, which is better than having wrong result as in v0.6.2… although I would rather see a nice ArgumentError than a “crash” like message.

julia> using SharedArrays, Distributed

julia> A = SharedArray(rand(10,10));

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> sum(A)
51.45023448191144

julia> remotecall_fetch(sum, 2, A)
ERROR: On worker 2:
BoundsError: attempt to access 0×0 Array{Float64,2} at index [1]
getindex at ./array.jl:646 [inlined]
getindex at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/SharedArrays/src/SharedArrays.jl:497 [inlined]
mapreduce_impl at ./reduce.jl:188
mapreduce_impl at ./reduce.jl:205 [inlined]
_mapreduce at ./reduce.jl:352 [inlined]
mapreduce at ./reduce.jl:358 [inlined]
sum at ./reduce.jl:437 [inlined]
sum at ./reduce.jl:453
#121 at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/Distributed/src/process_messages.jl:269
run_work_thunk at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/Distributed/src/process_messages.jl:56
macro expansion at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/Distributed/src/process_messages.jl:269 [inlined]
#120 at ./event.jl:92
Stacktrace:
 [1] #remotecall_fetch#159(::Base.Iterators.IndexValue{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::SharedArray{Float64,2}, ::Vararg{SharedArray{Float64,2},N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/Distributed/src/remotecall.jl:374
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::SharedArray{Float64,2}, ::Vararg{SharedArray{Float64,2},N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/Distributed/src/remotecall.jl:366
 [3] #remotecall_fetch#162(::Base.Iterators.IndexValue{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::SharedArray{Float64,2}, ::Vararg{SharedArray{Float64,2},N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/Distributed/src/remotecall.jl:387
 [4] remotecall_fetch(::Function, ::Int64, ::SharedArray{Float64,2}, ::Vararg{SharedArray{Float64,2},N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/site/v0.7/Distributed/src/remotecall.jl:387
 [5] top-level scope

julia> versioninfo()
Julia Version 0.7.0-DEV.3396
Commit 4f43bf7480 (2018-01-13 19:38 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin14.5.0)
  CPU: Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, haswell)
Environment:


#3

I have not actually used SharedArrays in any real work before so I don’t know the easiest way to rebind, but you can find how much of an array A is saved on the memory of which process using fetch(@spawnat pid length(A.loc_subarr_1d)). Then you can redistribute the data across processors by moving them around but that will be a bit tricky to get right.


#4

Just a bit of exploration:

julia> A = SharedArray(rand(100,100));

julia> A.pids
1-element Array{Int64,1}:
 1

julia> addprocs(1);

julia> fetch(@spawnat 2 isdefined(:A))
false

julia> fetch(@spawnat 2 sum(A))
NaN

julia> fetch(@spawnat 2 isdefined(:A))
true

julia> fetch(@spawnat 2 length(A.loc_subarr_1d))
0

julia> fetch(@spawnat 1 length(A.loc_subarr_1d))
10000

julia> A = SharedArray(rand(100,100));

julia> addprocs(1);

julia> A = SharedArray(A);

julia> fetch(@spawnat 2 isdefined(:A))
false

julia> @spawnat 2 identity(A)
Future(2, 1, 7, Nullable{Any}())

julia> fetch(@spawnat 2 isdefined(:A))
true

julia> fetch(@spawnat 1 length(A.loc_subarr_1d))
10000

julia> fetch(@spawnat 2 length(A.loc_subarr_1d))
0

So there are tricks to make A visible to other procs by just calling identity, but you have to move data around yourself.


#5

I think the answer is that you currently can’t add a new process after the SharedArray has been created. AFAIU, this line in SharedArrays.jl unlinks the name for the anonymous mmap backing the shared array so that it can no longer be referenced by anything else. Otherwise I’d expect something like the following to not error and at least be proof that you can access the underlying mapped memory:

@static if VERSION >= v"0.7-"
    using SharedArrays, Distributed
end

A = SharedMatrix{Float64}(3, 3);
procs(A)
procs()

addprocs(1)
procs(A)
procs()

@eval @everywhere begin
    @static if VERSION >= v"0.7-"
        using SharedArrays, Distributed
        import SharedArrays: shm_mmap_array
    else
        import Base: shm_mmap_array
    end
    import Base.Filesystem: JL_O_RDWR
    B = let segname = $(A.segname), T = $(eltype(A)), dims = $(size(A))
        shm_mmap_array(T, dims, segname, JL_O_RDWR)
    end
end

remotecall_fetch.(() -> repr(B), workers())

(Just to reiterate and be clear, this does not work.)


#6

Just calling identity to bring the array into existence smells like a bug to me… In fact, it seems to be the only way to make it visible even for existing processes, not just new ones.

The reason to use SharedArrays (shared memory) in the first place is to avoid moving data around… I’m trying to avoid doing that :stuck_out_tongue:


#7

I think you’re right. So a PR would be required to make that work, huh! :slight_smile:

The unlink operation doesn’t have to happen immediately. I can see why the current implementation is safe because if Julia crashes then the stale shared memory region would be forever lost until system reboot - https://en.wikipedia.org/wiki/Shared_memory

The shared memory created by shm_open is persistent. It stays in the system until explicitly removed by a process. This has a drawback that if the process crashes and fails to clean up shared memory it will stay until system shutdown.

It wouldn’t be impossible if we catch the system signals and perform a finalize action that frees the memory. I suppose that’s quite a bit of work to get right.


#8

Well it doesn’t seem to be moving much around when calling identity, I think it is only moving pointers not the data. So it makes it visible to the other process at almost no cost, except you can’t do much with that anyway because you have to move the data if you want to any work on the other process, so that is the real problem and I am not sure what a simple solution would be. Perhaps try calling SharedArray on the underlying data vector again to make a new SharedArray now with the new processes loaded. This will work if you start with 1 procs and then you want to share the SharedArray with other procs. When you start with more than 1, it’s trickier.


#9

Right, I am contemplating sharing a large array (~50GiB) and I want to minimize data movement as much as possible. I want to deal with exceptions – when a process crashes I should be able to bring it up and have it rejoin the cluster. It seems that the easiest solution is to add the process and re-create the large array and dump the original one.