I have a vector chains which contains a mutable struct called Chain and a function f!(chain::Chain) which alters chain. I want to distribute each element of chains to a worker so that the worker could return to me a new chain, which I would later combine into the new chains.
Currently, I’m using a function as follows to distribute the work.
function g(chains::Vector{Chain})
futures = []
@assert nworkers() == length(chains)
for i in 1:nworkers()
push!(futures, @spawnat workers()[i] f!(chains[i]))
return [fetch(future) for future in futures]
end
Currently f! also returns chain in addition to altering the input, but I worry that might consume extra memory in the worker, since I’m not sure how variable assignment works in the worker (i.e. I worry that a new variable is being created containing the returned value of f! in addition to the altered input).
I want to have a way to create a variable chain in each worker that is altered and that I can fetch, so that I don’t have let f! return any value. Then I could do something like chains = [@fetchfrom i chain for i in workers()].
Is this possible in any way? I tried adding chain = chains[i] in the for and feeding chain to f! but it doesn’t show up in the worker, I presume because it is in a local scope. If I declare it to be a global and I fetch it, it reassigns the value in the worker since its value changes in the for loop.
@everywhere function h!(channel::RemoteChannel)
chain = take!(channel)
f!(chain)
put!(channel, chain)
chain = nothing # I added this in the hopes that the worker doesn't retain chain in memory after put!
end
function g(chains::Vector{Chain})
@assert length(chains) == nworkers()
channel = RemoteChannel(()->Channel{Chain}(length(chains)), 1)
for chain in chains
put!(channel, chain)
end
@sync for i in 1:nworkers()
@spawnat workers()[i] h!(channel)
end
chains = []
for i in 1:nworkers()
push!(chains, take!(channel))
end
return chains
end
However, even after calling the garbage collector, there seems to be double the amount of chains in memory. Each worker still has memory occupied worth one chain as I can see on my system monitor, and the main process also has a all the chains separately in memory.
Thanks for this. It really helped but I still need to figure out how to pass data from the workers to the main process without duplicating it in memory since I’m close to memory limit. I guess I can write it to disk?
So after I do
for i in 1:nworkers()
sendto(i+1, chain=chains[i])
end
for i in 1:nwokers()
@spawnat i+1 f!(chain)
end
I do have exactly what I want, each worker has a chain they sampled independently and in parallel. However, I would like to pass it to the main process without having it duplicated in memory. I could potentially copy them one by one from each worker and then killing the worker but I don’t like this approach.
Edit: I tried setting chain to nothing at each worker and then running @everywhere GC.gc() but this did not clear memory as I had expected it to.
Yes it would be simpler indeed, but for some reason it’s about 50 percent slower, so I wanted to see if I could gain in time without sacrificing memory.
Does your code allocate a lot? If yes, either get rid of allocations, or Distributed might be better. Of not, Threading should be better. In 1.10 the GC should be hyper-threaded, which might help, but I have not tested that.