How to avoid repeated data movement between processes?

I have a program looking like

@everywhere function foo(nm, k)
    # do something
    return result
end
n = 100
ks = rand(3, n)
nm = rand(10000, 10000)
s = 0.0
np = nprocs()
results = Vector{Any}(n)
i = 1
nextidx() = (idx=i; i+=1; idx)
@sync begin
    for p=1:np
        if p != myid() || np == 1
            @async begin
                while true
                    idx = nextidx()
                    if idx > n
                        break
                    end
                    results[idx] = remotecall_fetch(foo, p, nm, ks[idx])
                end
            end
        end
    end
end

The problem is that each time a task starts, a large object (nm) is moving from local process to remote process, which costs some unnecessary computation time. For the simple example above, a SharedArray can be used. However, in my program, a quite complex type is assigned to nm. In this case, how to avoid this overhead?

Welcome to the wonderful world of load-balancing distributed-memory data structures, where there are no easy answers. Specifically, nothing useful can be said if all we know is that your type is “quite complex”.

The only alternative is to not use distributed memory at all: use shared-memory threads.

3 Likes

Try using a CachingPool: https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.CachingPool

2 Likes

Specifically, nm in my type is

struct NoTBModel{T}
    norbits::Int64
    lat::SMatrix{3,3,Float64,9}
    rlat::SMatrix{3,3,Float64,9}
    hoppings::Dict{SVector{3,Int64},T}
    overlaps::Dict{SVector{3,Int64},T}
    positions::Dict{SVector{3,Int64},SVector{3,T}}
end

where, T is typically SparseMatrixCSC{Float64,Int64}.

As for the suggestion for multithreading, I am reluctant to use that due to possible problems in matrix multiplication. This link (https://github.com/JuliaLang/julia/issues/22581) says it’s not thread safe.

The suggestion of using a CachingPool is a good one.

If you do want to use a custom pmap (that doesn’t support caching pools), then you might consider creating a global const on each worker and creating a closure over this global const for use with your custom pmap.

# instantiate `nm` on all workers as global const `g_nm`
for p in workers()
    remotecall_wait(eval, p, :(global const g_nm = $nm))
end

# create closure to pass to custom pmap
f(k) = foo(g_nm, k)
res = my_pmap(f, ks)

I don’t think it’s a particularly good solution, but it works for some use cases.

See also ParallelDataTransfer, which provides helper functions for transferring data between worker processes.

1 Like

Thank you. This seems to be a promising solution. However, I am not sure I understand this CachingPool. Since different processes are performing different jobs, it is not possible that two closures are identical. In that case, what exactly does CachingPool store?

I don’t think that applies to SMatrix or SparseMatrixCSC multiplication?

Do you mean that nm is different on each iteration? For CachingPool to be useful, I believe you need to be re-using a particular value of nm across all workers. You capture it in a closure and then repeatedly use that closure.

If you can construct nm on the worker processes themselves, that’s usually a good call (ie. maybe only one field of nm changes on each iteration so you only have to transfer that field?)

Finally, you have a couple of other options as well that are more explicit about the data movement:

# remotecall this function on all your workers and you can keep feeding it new
# jobs to work on through the `input` channel, but that big matrix only gets
# moved once when you first do the `remotecall`
function remote_processing_loop(input::RemoteChannel, output::RemoteChannel,
                                big_matrix_i_only_want_to_move_once)
    while true
        x = take!(input)
        x === nothing && break
        put!(output, process(x, big_matrix_i_only_want_to_move_once)
    end
end
# alternatively you can define a worker module and store the complex types that
# you don't want to move frequently as global variables
module Worker

function transfer_big_matrix(new_big_matrix)
    global big_matrix = new_big_matrix
end

function process(x)
    process(x, big_matrix)
end

function process(x, big_matrix)
    # do the data processing here
end

end
1 Like

I do have some other matrix multiplications in the foo function.

nm is indeed the same throughout iteration. However, the iteration count idx is definitely different for each iteration. Is CachingPool smart enough to figure out nm is not changing and idx is changing?

I like this explicit data movement, since I believe I have more control.

Being more explicit about the data movement is generally my preference as well, but it sounds like you might be able to get away with the CachingPool here.

Something like this might work for you:

foo(k) = bar(nm, k) # capture nm in a closure
pool = CachingPool(workers())
@sync for worker in workers()
    @async while length(queue) > 0
        idx = pop!(queue)
        results[idx] = remotecall_fetch(foo, pool, ks[idx])
    end
end
2 Likes

I noticed you fixed matmul multithreading problem in a recent commit. Thank you @stevengj.

Note that this only ever affected the “generic” matmul routine used as a fallback for user-defined number and array types. If you are using ordinary sparse or dense arrays of 32-bit or 64-bit real or complex floating-point values, thread safety was fine even before my commit.

1 Like