Understanding distributed memory garbage collection

Hello,
In running some large scale parallel computations a colleague of mine has run into problems with memory leakage. To be more specific, the amount of memory used by the offending program gradually increases over the course of the computation until (for large enough problems) all system memory is used up and the program crashes. The memory required by the program should remain relatively constant after an initial spinup period. We’ve found the source of the problem but don’t understand what’s going on. I’m suspicious that the observed behaviour is due to some bug in how Futures are marked as eligible for garbage collection but it may be that I just don’t understand what’s going on. We’ve reduced the problem to the following MWE, with one version exhibiting the β€œmemory leak” and the other not doing so. I apologize that the MWE is a bit long. The basic idea is that we have a master function that distributes subproblem computations to remote worker processes and then collects the results on the master process. Each worker may host multiple subproblems. There is some input data from the master process that all subproblems will need and must be communicated from the master process to the workers. Let’s first define a couple of functions:

function workerFunction( sigmaRef::Union{Future,RemoteChannel} )

   sigma = fetch(sigmaRef)
   println(sum(sigma))

   return true
end

function leakingMasterFunction( n1::Int, n2::Int )

   sigma      = ones(n1)
   nw         = nworkers()
   workerList = workers()
   sigmaRef   = Array{Future}(nw)
   
   @sync begin
       for i in 1:nw
           p = workerList[i]
           @async begin

              sigmaRef[i] = remotecall_wait(identity,p,sigma)
              
              for idx = 1:n2
                 b = remotecall_fetch(workerFunction,p,sigmaRef[i])
              end  # idx

           end  # @async begin
       end  # p
   end  # @sync begin

end  # func

Now, assume we’ve launched Julia with two or more workers and loaded the above functions. If we, from the REPL, execute

> nPerWorker = 2
> while true; masterFunction( 5000000 , nPerWorker); end

For nperWorker > 1 This will cause the memory used by Julia to increase until the system runs out of memory and the code crashes. With nPerWorker = 1 The infinite loop continues to run forever with relatively constant memory usage. Running finalize on the elements of sigmaRef doesn’t help.

However, if we take the following slightly different approach where RemoteChannels are used to communicate the input to the workers we see no memory leak

# Helper functions for non-leaking version of master function
function initRemoteChannel(func::Union{Function,Type}, pid::Int64, args...; kwargs...)
  return RemoteChannel(()->initChannel(func,args,kwargs), pid)
end

function initChannel(func::Union{Function,Type},args::Tuple,kwargs::Array)
  obj = func(args...; kwargs...)
  chan = Channel{typeof(obj)}(1)
  put!(chan,obj)
  return chan
end

function nonLeakingMasterFunction( n1::Int, n2::Int )

   sigma      = ones(n1)
   nw         = nworkers()
   workerList = workers()
   sigmaRef   = Array{RemoteChannel}(nw)
   
   @sync begin
       for i in 1:nw
           p = workerList[i]
           @async begin

              sigmaRef[i] = initRemoteChannel(identity,p,sigma)
              
              for idx = 1:n2
                 b = remotecall_fetch(workerFunction,p,sigmaRef[i])
              end  # idx

           end  # @async begin
       end  # p
   end  # @sync begin

end  # func

The initRemoteChannel(func,p,args...,kwargs...) function creates a RemoteChannel on the master process that stores a reference to the output of func, which is computed and then wrapped in a Channel on worker p. So, in summary, if we use the approach in leakingMasterFunction to distribute sigma to the workers we get memory leakage, and if we use the approach in nonLeakingMasterFunction there’s no leak.

As far as I can understand from the parallel computing docs in the Julia manual, the value referenced by a Future should be eligible for garbage collection once all references to it have either been fetched or marked for garbage collection themselves. So why does the above code cause the system to run out of memory? Is this a bug or am I missing something about how Futures work?

Many thanks for any help, Patrick

Edit: I should add that I’ve read https://github.com/JuliaLang/julia/issues/12099. I understand that if an unfetched Future isn’t actually garbage collected then the process holding the value referenced by that Future might not get notified that the value can be garbage collected. As I mentioned above, explicitly calling finalize doesn’t help matters so I don’t think this is the issue but I’d be happy if someone could explain to me why I’m wrong.

3 Likes

This is a bug.

Edit: A workaround (applicable in the context of this MWE only) till it is fixed is to finalize the ref in the worker function as soon as it is fetched.

function workerFunction( sigmaRef::Union{Future,RemoteChannel} )

   sigma = fetch(sigmaRef)
   finalize(sigmaRef)                 # <---- Finalize after fetching in the worker func.
   println(sum(sigma))

   return true
end

I have tested with the workaround on 0.6.2

Would you be able to open an issue on Github for this ?

1 Like

Thanks very much @amit.murthy for the reply and the suggested workaround. I’ll open an issue shortly. Following up, is there a downside to just using RemoteChannels in the way it’s done in nonLeakingMasterFunction in the original post? It seems to work in the MWE and in the actual code that motivated all this.

An issue has been created in the main Julia repo:

https://github.com/JuliaLang/julia/issues/25847

Would this also be the explanation of slowly growing memory for a @distributed for loop?

This is the smallest example I could come up with that is kinda showing my problem- my real function that I’m running in a @distributed for loop has the Distributed module growing very big in memory over time, and I can’t seem to do anything to clear it.

The actual function reads a pixel from an HDF5 file in the workers, processes it, then writes to a separate file, so there should be no lingering memory in the workers. But when I run InteractiveUtils.varinfo(), the Distributed module keeps getting bigger and I don’t know how to check why.

julia> @everywhere function testmem()
           a = rand(1000, 1000, 10)
           return mean(a)
       end

julia> @everywhere using Statistics

julia> @fetchfrom 2 InteractiveUtils.varinfo()
  name             size summary
  ––––––––––– ––––––––– –––––––––––––––
  Base                  Module
  Core                  Module
  Distributed 1.340 MiB Module
  Main                  Module
  testmem       0 bytes typeof(testmem)

julia> @time @sync @distributed for i=1:5000
           testmem()
       end
 48.727456 seconds (168.32 k allocations: 8.229 MiB)
Task (done) @0x00007fe53c531ae0


julia> @fetchfrom 2 InteractiveUtils.varinfo()
  name             size summary
  ––––––––––– ––––––––– –––––––––––––––
  Base                  Module
  Core                  Module
  Distributed 1.360 MiB Module
  Main                  Module
  testmem       0 bytes typeof(testmem)

even GC.gc() doesn’t help:

julia> @everywhere GC.gc()

julia> @fetchfrom 2 InteractiveUtils.varinfo()
  name             size summary
  ––––––––––– ––––––––– –––––––––––––––
  Base                  Module
  Core                  Module
  Distributed 1.361 MiB Module
  Main                  Module
  testmem       0 bytes typeof(testmem)

Actually I think my problem is more related to an HDF5 bug that’s been around for awhile https://github.com/JuliaIO/HDF5.jl/issues/349