@everywhere function foo(nm, k)
# do something
return result
end
n = 100
ks = rand(3, n)
nm = rand(10000, 10000)
s = 0.0
np = nprocs()
results = Vector{Any}(n)
i = 1
nextidx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
if p != myid() || np == 1
@async begin
while true
idx = nextidx()
if idx > n
break
end
results[idx] = remotecall_fetch(foo, p, nm, ks[idx])
end
end
end
end
end
The problem is that each time a task starts, a large object (nm) is moving from local process to remote process, which costs some unnecessary computation time. For the simple example above, a SharedArray can be used. However, in my program, a quite complex type is assigned to nm. In this case, how to avoid this overhead?
Welcome to the wonderful world of load-balancing distributed-memory data structures, where there are no easy answers. Specifically, nothing useful can be said if all we know is that your type is “quite complex”.
struct NoTBModel{T}
norbits::Int64
lat::SMatrix{3,3,Float64,9}
rlat::SMatrix{3,3,Float64,9}
hoppings::Dict{SVector{3,Int64},T}
overlaps::Dict{SVector{3,Int64},T}
positions::Dict{SVector{3,Int64},SVector{3,T}}
end
where, T is typically SparseMatrixCSC{Float64,Int64}.
As for the suggestion for multithreading, I am reluctant to use that due to possible problems in matrix multiplication. This link (https://github.com/JuliaLang/julia/issues/22581) says it’s not thread safe.
The suggestion of using a CachingPool is a good one.
If you do want to use a custom pmap (that doesn’t support caching pools), then you might consider creating a global const on each worker and creating a closure over this global const for use with your custom pmap.
# instantiate `nm` on all workers as global const `g_nm`
for p in workers()
remotecall_wait(eval, p, :(global const g_nm = $nm))
end
# create closure to pass to custom pmap
f(k) = foo(g_nm, k)
res = my_pmap(f, ks)
I don’t think it’s a particularly good solution, but it works for some use cases.
See also ParallelDataTransfer, which provides helper functions for transferring data between worker processes.
Thank you. This seems to be a promising solution. However, I am not sure I understand this CachingPool. Since different processes are performing different jobs, it is not possible that two closures are identical. In that case, what exactly does CachingPool store?
Do you mean that nm is different on each iteration? For CachingPool to be useful, I believe you need to be re-using a particular value of nm across all workers. You capture it in a closure and then repeatedly use that closure.
If you can construct nm on the worker processes themselves, that’s usually a good call (ie. maybe only one field of nm changes on each iteration so you only have to transfer that field?)
Finally, you have a couple of other options as well that are more explicit about the data movement:
# remotecall this function on all your workers and you can keep feeding it new
# jobs to work on through the `input` channel, but that big matrix only gets
# moved once when you first do the `remotecall`
function remote_processing_loop(input::RemoteChannel, output::RemoteChannel,
big_matrix_i_only_want_to_move_once)
while true
x = take!(input)
x === nothing && break
put!(output, process(x, big_matrix_i_only_want_to_move_once)
end
end
# alternatively you can define a worker module and store the complex types that
# you don't want to move frequently as global variables
module Worker
function transfer_big_matrix(new_big_matrix)
global big_matrix = new_big_matrix
end
function process(x)
process(x, big_matrix)
end
function process(x, big_matrix)
# do the data processing here
end
end
nm is indeed the same throughout iteration. However, the iteration count idx is definitely different for each iteration. Is CachingPool smart enough to figure out nm is not changing and idx is changing?
I like this explicit data movement, since I believe I have more control.
Being more explicit about the data movement is generally my preference as well, but it sounds like you might be able to get away with the CachingPool here.
Something like this might work for you:
foo(k) = bar(nm, k) # capture nm in a closure
pool = CachingPool(workers())
@sync for worker in workers()
@async while length(queue) > 0
idx = pop!(queue)
results[idx] = remotecall_fetch(foo, pool, ks[idx])
end
end
Note that this only ever affected the “generic” matmul routine used as a fallback for user-defined number and array types. If you are using ordinary sparse or dense arrays of 32-bit or 64-bit real or complex floating-point values, thread safety was fine even before my commit.