Creating global variable in a worker

I have a vector chains which contains a mutable struct called Chain and a function f!(chain::Chain) which alters chain. I want to distribute each element of chains to a worker so that the worker could return to me a new chain, which I would later combine into the new chains.

Currently, I’m using a function as follows to distribute the work.

function g(chains::Vector{Chain})
    futures = []
    @assert nworkers() == length(chains)
    for i in 1:nworkers()
        push!(futures, @spawnat workers()[i] f!(chains[i]))
    return [fetch(future) for future in futures]
end

Currently f! also returns chain in addition to altering the input, but I worry that might consume extra memory in the worker, since I’m not sure how variable assignment works in the worker (i.e. I worry that a new variable is being created containing the returned value of f! in addition to the altered input).

I want to have a way to create a variable chain in each worker that is altered and that I can fetch, so that I don’t have let f! return any value. Then I could do something like chains = [@fetchfrom i chain for i in workers()].

Is this possible in any way? I tried adding chain = chains[i] in the for and feeding chain to f! but it doesn’t show up in the worker, I presume because it is in a local scope. If I declare it to be a global and I fetch it, it reassigns the value in the worker since its value changes in the for loop.

Try using a RemoteChannel for communication.

This was my first thought.

I’ve changed the function to as follows:

@everywhere function h!(channel::RemoteChannel)
    chain = take!(channel)
    f!(chain)
    put!(channel, chain)
    chain = nothing # I added this in the hopes that the worker doesn't retain chain in memory after put!
end

function g(chains::Vector{Chain})
    @assert length(chains) == nworkers()
    channel = RemoteChannel(()->Channel{Chain}(length(chains)), 1)
    for chain in chains
        put!(channel, chain)
    end
    @sync for i in 1:nworkers()
        @spawnat workers()[i] h!(channel)
    end
    chains = []
    for i in 1:nworkers()
        push!(chains, take!(channel))
    end
    return chains
end

However, even after calling the garbage collector, there seems to be double the amount of chains in memory. Each worker still has memory occupied worth one chain as I can see on my system monitor, and the main process also has a all the chains separately in memory.

Have you seen this

Thanks for this. It really helped but I still need to figure out how to pass data from the workers to the main process without duplicating it in memory since I’m close to memory limit. I guess I can write it to disk?

So after I do

for i in 1:nworkers()
    sendto(i+1, chain=chains[i])
end

for i in 1:nwokers()
    @spawnat i+1 f!(chain)
end

I do have exactly what I want, each worker has a chain they sampled independently and in parallel. However, I would like to pass it to the main process without having it duplicated in memory. I could potentially copy them one by one from each worker and then killing the worker but I don’t like this approach.

Edit: I tried setting chain to nothing at each worker and then running @everywhere GC.gc() but this did not clear memory as I had expected it to.

Sorry, no idea how to do this. Would not be better to use multi-threadding in this case?

Yes it would be simpler indeed, but for some reason it’s about 50 percent slower, so I wanted to see if I could gain in time without sacrificing memory.

Do you have hyperthreading enabled? You may want to use ThreadPinning.jl I’ve got that in my startup.jl so it’s just always pinning threads to cores.

Does your code allocate a lot? If yes, either get rid of allocations, or Distributed might be better. Of not, Threading should be better. In 1.10 the GC should be hyper-threaded, which might help, but I have not tested that.