I have a vector chains which contains a mutable struct called Chain and a function f!(chain::Chain) which alters chain. I want to distribute each element of chains to a worker so that the worker could return to me a new chain, which I would later combine into the new chains.
Currently, I’m using a function as follows to distribute the work.
function g(chains::Vector{Chain})
futures = []
@assert nworkers() == length(chains)
for i in 1:nworkers()
push!(futures, @spawnat workers()[i] f!(chains[i]))
return [fetch(future) for future in futures]
Currently f! also returns chain in addition to altering the input, but I worry that might consume extra memory in the worker, since I’m not sure how variable assignment works in the worker (i.e. I worry that a new variable is being created containing the returned value of f! in addition to the altered input).
I want to have a way to create a variable chain in each worker that is altered and that I can fetch, so that I don’t have let f! return any value. Then I could do something like chains = [@fetchfrom i chain for i in workers()].
Is this possible in any way? I tried adding chain = chains[i] in the for and feeding chain to f! but it doesn’t show up in the worker, I presume because it is in a local scope. If I declare it to be a global and I fetch it, it reassigns the value in the worker since its value changes in the for loop.
@everywhere function h!(channel::RemoteChannel)
chain = take!(channel)
put!(channel, chain)
chain = nothing # I added this in the hopes that the worker doesn't retain chain in memory after put!
function g(chains::Vector{Chain})
@assert length(chains) == nworkers()
channel = RemoteChannel(()->Channel{Chain}(length(chains)), 1)
for chain in chains
put!(channel, chain)
@sync for i in 1:nworkers()
@spawnat workers()[i] h!(channel)
chains = []
for i in 1:nworkers()
push!(chains, take!(channel))
return chains
However, even after calling the garbage collector, there seems to be double the amount of chains in memory. Each worker still has memory occupied worth one chain as I can see on my system monitor, and the main process also has a all the chains separately in memory.
Thanks for this. It really helped but I still need to figure out how to pass data from the workers to the main process without duplicating it in memory since I’m close to memory limit. I guess I can write it to disk?
So after I do
for i in 1:nworkers()
sendto(i+1, chain=chains[i])
for i in 1:nwokers()
@spawnat i+1 f!(chain)
I do have exactly what I want, each worker has a chain they sampled independently and in parallel. However, I would like to pass it to the main process without having it duplicated in memory. I could potentially copy them one by one from each worker and then killing the worker but I don’t like this approach.
Edit: I tried setting chain to nothing at each worker and then running @everywhere GC.gc() but this did not clear memory as I had expected it to.
Yes it would be simpler indeed, but for some reason it’s about 50 percent slower, so I wanted to see if I could gain in time without sacrificing memory.
Does your code allocate a lot? If yes, either get rid of allocations, or Distributed might be better. Of not, Threading should be better. In 1.10 the GC should be hyper-threaded, which might help, but I have not tested that.