Multithreaded IOBuffer memory leak?

See #49545.

Basically this piece of code leads to runaway memory that I can’t get rid of anymore:

function explode(lck, buffer, path, message; kwargs...)
    lock(buffer) do
        println(buffer, message)
        open(path, append = true) do f
            write(f, take!(buffer))
        end
    end
end

let
    lck = ReentrantLock()
    buffer = IOBuffer()
    path = "test.log"
    while true
        Threads.@spawn begin
            explode(lck, buffer, path, "blabla")
        end
    end
end

Does anyone know what is going on, and how to fix it?

1 Like

I assume it’s lock(lck), not lock(buffer).

I replaced the while true with for _ in 1:2000000.

With 24 threads, or with 4 threads, the program allocates about 125GB in a few seconds, and returns to the prompt. I.e. all the 2 million tasks have been spawned. It then run the tasks in the background, and finishes them in about 2 minutes. Then I can run GC.gc() 3-4 times to get memory usage down to approx 7GB. I can repeat the let-block, wait until things calm down, and run gc(), and I always end up with approx 7GB memory in use. Curiously, it’s not sufficient with a single GC.gc(), I need 3-4 to free all the memory.

That is, I can’t reproduce your problem.

Of course, with the while true, we never finish spawning new tasks, and we spawn much faster then they finish, so the overhead in each will necessarily keep memory usage high.

Very interesting. The while block was simply a way to quickly get it visible since in my real scenario this happens over the course of a week where the process will slowly but surely allocate 25gb and never free it as different threads log messages.

In the real code there’s plenty of time where nothing is happening for things to be cleaned up. Also, I’m not sure if it makes sense that you need 3-4 GC calls manually for the cleanup to happen and it also doesn’t make sense that it keeps 7gb laying around imo…

So to me it seems you are in fact reproducing the issue

EDIT: Indeed the initial code was essentially just throwing errors!

I’ve done some more digging running the following code, which should not be suffering from the fork bomb with while true, and is thus closer to my real world case:

function meminfo_julia!(d)
  push!(d["GC Live"], Base.gc_live_bytes()/2^20)
  push!(d["Max. RSS"], Sys.maxrss()/2^20)
end

function meminfo_procfs(d)
    pid=getpid()
  smaps = "/proc/$pid/smaps_rollup"
  if !isfile(smaps)
    error("`$smaps` not found. Maybe you are using an OS without procfs support or with an old kernel.")
  end

  rss = pss = shared = private = 0
  for line in eachline(smaps)
    s = split(line)
    if s[1] == "Rss:"
      rss += parse(Int64, s[2])
    elseif s[1] == "Pss:"
      pss += parse(Int64, s[2])
    end
  end

  push!(d["RSS"], rss/2^10)
  push!(d["PSS"], pss/2^10)
end

function explode(buffer = IOBuffer())
    lck = ReentrantLock()
    path = "test.log"
    @sync for _ in 1:1000000
        Threads.@spawn begin
            lock(lck) do
                println(buffer, "blabla")
                open(path, append=true) do f
                    write(f, take!(buffer))
                end
            end
        end
    end
end

memdict = Dict("GC Live" => Float64[],
               "Max. RSS" => Float64[],
               "RSS" => Float64[],
               "PSS" => Float64[])
t = Threads.@spawn let
    buffer = IOBuffer()
    for i = 1:200
        explode(buffer)
        meminfo_julia!(memdict)
        meminfo_procfs(memdict)
    end
end

Which takes ages, but produces:

So while the memory consumption seems to stabilize around 6Gb, this is actually not quite true as it keeps trending upwards slowly but surely. I think this is exactly the leak I’m seeing in my real world case.

I’d love to get some more insights on what might be going on here

I’ve done some more digging, I think I have isolated the issue in the above piece of code to the write(f, take!(buffer)) part, which can be recreated with much less threads and simply a larger message like:

const MSG = String(rand(UInt8, Int(10e6)))
function explode(buffer = IOBuffer())
    lck = ReentrantLock()
    out = ""
    @sync for _ in 1:2
        Threads.@spawn begin
            lock(lck) do
                println(buffer, MSG)
                out *= String(take!(buffer))
            end
        end
    end
    
    return out
end

I have done the above test in a loop which leads to:
explode

1 Like

Which part of this is unexpected? The IOBuffer you’re creating is never reset, so it continues using more and more memory as you write more and more data to it.

I guess it’s surprising that take! here doesn’t actually resize the IOBuffer to a smaller size, contrary to its documentation :thinking:

take! resets the buffer to its original state, and you can even try length(buffer.data) after the String(take!(buffer)) to find it’s empty. It being empty, and the String not being referenced anymore, it should be garbage collected.

Even when trying GC.gc() multiple times after this, it doesn’t change. To me this is 1 very unexpected given the documentation and 2 very much looking like a mem leak

What the surface level API tells you is a bit different from what the implementation actually does. Internally, IOBuffer is backed by a Vector which it resize!s on take! (check the implementation with @edit take!(IOBuffer())). What you’re essentially saying is that continually resize!ing a vector leads to a memory leak.

No, as you can see in the code piece, this happens even when using a new buffer every time. Did you try running it? (try with -t 3 at least, at -t 2 it’s all good)

No, I haven’t run it, because I don’t know how you’ve taken your measurement. From the original code here you are reusing the same IOBuffer and I don’t know whether that changed in your most recent version.

This is the actual snippet. I just ran it on 1.9-rc4 and master with julia -t 3 --startup-file=no memleak.jl and it leads to the issue. On 1.8.5 it seems to work fine.
memleak.jl (353 Bytes)

Edit: sorry I meant 1.9.0-rc2 ofcourse

Is that also measured with the suite from above (how)? The code as linked doesn’t produce a graph and doesn’t seem to measure anything after all.

One thing I can think of is out being shared across threads causing some form of blowup, but that ought to be reproducible even without an IOBuffer as an intermediary.

I can reproduce this plot from the simpler program. However, if I run 5000 iterations of explode, there seems to be a full garbage collection around iteration 4800. This causes memory use to drop to a around 100MB, but rising at the same rate until iteration 6000.

I think there are a couple of things going on here. The spawned tasks return the out String. It is never fetched, but if I remember correctly, a reference will be hanging around in thread local storage. The same thing happens with the return value from explode, I just discard it.

So, there are several references to one or two strings in the thread local storage. These can take some time to be collected by the GC. There was a github issue about this some time ago. Anyway, things appear to be ok again after the full garbage collection.

I have tried with omitting out, and it seems to still happen. For me invoking GC.gc() never seems to bring things down that much. In any case i don’t think this should be happening. It’s also very weird that it depends on -t 2 vs -t 3

Sorry, i misread before, what do you mean with return out? The spawned threads only change out, the main thread returns it

I experienced a memory leak when writing dataframes with CSV.wrtie too with multithreading.