Multithreaded memory usage

I have a project where julia’s memory usage is at the moment too high, and we can not use it. I have attempted to replicate the behaviour in a minimal working example. The example is oversimplified, in that you could rewrite it to work entirely in-place. This is not possible for the actual code.

using Statistics, Base.Threads

ms = [randn(ComplexF64,100*2,100*2) for i in 1:20000]

println("this should only take on the order $((Base.summarysize(ms[1])*3*nthreads()+Base.summarysize(ms))/(1024*1024*1024)) GB")

function _reduce(blocks,basesize)
    if length(blocks)<basesize
        toret = zero(blocks[1]);
        for a in blocks
            t1 = randn()*a;
            t2 = randn()*t1;
            t3 = randn()*t2;
            t4 = randn()*t3;
            t5 = randn()*t4;
            t6 = randn()*t5;
            t7 = randn()*t6;
            t8 = randn()*t7;
            t9 = randn()*t8;
            t9 .-=mean(t9)

            toret += t9
        end

        return toret
    else
        split = Int(ceil(length(blocks)/2));
        t = @Threads.spawn _reduce(blocks[1:split],basesize)
        toret = _reduce(blocks[split+1:end],basesize);
        toret+=fetch(t)
        return toret
    end
end

@show mean(_reduce(ms,length(ms)/nthreads()))

println("done")
readline()

Notice how I allocate a bunch of temporaries, but that they can almost directly be freed up again. I start by printing out the amount of memory that I expect to need (13 GB), but when I fire it up with 12 threads on a 32Gb ram laptop, it gets OOM killed.

Even worse, the heap-size-hint gets completely ignored, and when I try to investigate heap snapshots from my actual code (so not the minimal example) then I see that these only describe a heap of size 1.5 GB, despite coming from a process taking up 20 times as much.

Not clue what’s the problem, but you could try adding a GC.gc() call. From the docs

help?> GC.gc()
  GC.gc([full=true])

  Perform garbage collection. The argument full determines the kind of collection: A full collection (default) sweeps all objects, which makes the next GC scan much
  slower, while an incremental collection may only sweep so-called young objects.

  │ Warning
  │
  │  Excessive use will likely lead to poor performance.

Yes, and I believe this is just made for you in that case, just read its docs:

Ideally a sufficiently smart compiler would realize allocated temporaries that go away in the loop, and allocate them on the stack automatically for you, i.e. do what that package does in without needing any code changes. I suppose it could, in some cases, then the temporaries are non-huge as in your case. When they are too big, or not known at compile time, I guess the compiler couldn’t do this. [Does C++ (or any known compiler) do something like this automatically? I.e. without such a library that I’m sure is available there.]

[I know C++, i.e. in the C++ LLVM library, has a stack allocated array available, that if too big reverts to the heap. That would work, for C++, and since it has destructors they would even get deallocated from the heap eagerly. Since this array type isn’t in the C++ standard library (why not?), I’m not sure C++ compilers would do this for you without help.]

Ideally also the garbage collector wouldn’t let garbage pile up too high, and take physical RAM size into account. I think it actually does that, tries to avoid that ceiling, but it’s a global property, and only the OS knows how much memory is allocated across processes, and it difficult I think for any one process to guess or find out info about RAM usage of other process combined (that is constantly changing, so I think it’s not even tried, and even if possible would likely be slow, a costly syscall). Julia, or any process that has a GC, can know only its own use, track it exactly, and inexpensively. And I believe that’s done.

At least tracking allocations and deallocations for any one thread is very cheap and done. It could be done, and likely is for all threads separately. The aggregate total across threads, is a simple sum, but I’m not sure how fast it can be had, I guess locks involved. Besides, as I said RAM usage is a global property so this is an unsolvable problem I think to do perfectly if you produce garbage. Or at best you have some good heuristics.

Even worse, the heap-size-hint gets completely ignored

That’s strange, it might have to do with threads used. You could try running with just one to see if it’s respected then.

The GC wasn’t multi-threaded, but it now is, on 1.10-alpha1, so you could try with that; and maybe you need to fiddle with the --gcthreads=N,M option. The M part may not be in, maybe on master, but I don’t think it’s relevant. This is all quite new.

The example is oversimplified, in that you could rewrite it to work entirely in-place. This is not possible for the actual code.

I think you’re saying something like this will not work (but for others is imilar situation where it would work, it should help):

That would absolutely cripple the performance beyond the point of usability.

I do like this approach and I have attempted something similar before. There are a few problems which would have to be worked around (I don’t actually use arrays, and I could also try out plain alloc/malloc) but ultimately, there will be temporaries that I simply cannot control. For example, I have modified the code further:

function inner!(accum,v)
    y = randn()*v;
    y .-= mean(y)
    accum += y
end

function _reduce(blocks,basesize)
    if length(blocks)<basesize
        toret = zero(blocks[1]);

        for a in blocks
            inner!(toret,a)
        end


        return toret
    else
        split = Int(ceil(length(blocks)/2));
        t = @Threads.spawn _reduce(blocks[1:split],basesize)
        toret = _reduce(blocks[split+1:end],basesize);
        toret+=fetch(t)
        return toret
    end
end

Here my multithreaded loop does not allocate anything, but it does call functions that will allocate. This is unavoidable, because you’ll generically have to call functions from other libraries, and these in turn may allocate.

I have also drastically gone back on the amount of temporaries that I allocate, and this still gets killed with 32 gb. Despite passing in an entirely ignored heap-size-hint.

Problem there is that the single threaded version allocates way less garbage, and does not hit the heap size hint.

Well, I consider being killed by OOM is even more performance crippling.
This was a suggestion as a workaround, as you seemed to need to get work done.
And you don’t have to GC.gc() on every loop iteration, perhaps sweeping every 1000th is already enough…

I only suggested GC.gc() because of this statement of yours

The example is oversimplified, in that you could rewrite it to work entirely in-place. This is not possible for the actual code.

But now you seem to be willing to rewrite code to be not allocating. So let me comment on the new MWE:

function inner!(accum,v)
    y = randn()*v;
    y .-= mean(y)
    accum += y
end

This will allocate a vector y if v was a vector as input. So your loop will certainly allocate, perhaps you benchmarked incorrectly? Furthermore, the function does not update accum in place, so your end result will be different from the MWE you posted first.

This is unavoidable, because you’ll generically have to call functions from other libraries, and these in turn may allocate.

The trick here is to avoid calling library methods that allocate.

Regarding the first MWE you gave:
I am wondering if the problem is caused by spawning threads recursively.
Perhaps try to rewrite the _reduce method so that you partition your blocks first and then spawn only the necessary number of threads with the partitioned data.

It is, but the alternative at the moment is to abandon julia entirely

To be clear - I am willing to make my own code non-allocating, but I am not willing to give up on other libraries that may allocate. That leads to the minimal example where the for loop itself does not allocate, but the function it calls does. Reimplementing everything (all underlying libraries) from scratch to make it non-allocating is not a tenable solution.

Perhaps try to rewrite the _reduce method so that you partition your blocks first and then spawn only the necessary number of threads with the partitioned data.

this I can try; and I will also experiment with further shrinking down on the number of allocations. I think I can open a bug report on the github, given that max-heap-size is certainly being ignored in the case of multiple threads

haha good catch, it should’ve been something like:

using Statistics, Base.Threads

ms = [randn(ComplexF64,100*2,100*2) for i in 1:20000]

println("this should only take on the order $((Base.summarysize(ms[1])*3*nthreads()+Base.summarysize(ms))/(1024*1024*1024)) GB")

function inner!(accum,v)
    x = randn()*v;
    y = randn()*x;
    y .-= mean(y)
    accum .+= y
end

function _reduce(blocks,basesize)
    if length(blocks)<basesize
        toret = zero(blocks[1]);

        for a in blocks
            inner!(toret,a)
        end


        return toret
    else
        split = Int(ceil(length(blocks)/2));
        t = @Threads.spawn _reduce(blocks[1:split],basesize)
        toret = _reduce(blocks[split+1:end],basesize);
        toret+=fetch(t)
        return toret
    end
end

@show mean(_reduce(ms,length(ms)/nthreads()))

println("done")
readline()

That would be regrettable, and I think not needed. I believe this GC issue with threading is known, and being worked on, and maybe actually solved already in 1.10? Did you try it?

That’s exactly what I wanted to hear, i.e. not a problem when single-threaded. You could try monitoring max memory use to see if some pattern merges with e.g. 1, 2, 3, 4, 8, or the default, max number of threads you use, with 1.9 vs 1.10 (with defaults I think ok; or you need to fiddle with --gcthreads?). I’m not sure if threading was always a problem, you could also consider testing or using e.g. 1.6.

Yes, I dislike adding such, as ideally the GC itself would figure the best times to do this automatically. But you could try to add GC.gc(full=false). I believe it’s much lighter since it only sweeps “so-called young objects”, e.g. I would think the garbage you just added. full=true is the default when you add this explicitly, and the “Excessive use will likely lead to poor performance” warning in the docs applies to that, maybe exclusively. Either way I don’t really like sprinkling code with either since it shouldn’t be needed… but until the GC is fixed; 1.10 shipped for production (I hesitate to recommend some alpha or master, while I usually use master with good effect, hopefully release of 1.10 isn’t too far off).

If you do add GG.gc, then could try it after each known allocation in the loop (or use Bumper.jl for that then not needed), and after each call to a library that (potentially) allocates.

If you’re thinking of doing full manual, then you could consider, which is an indirect dependency of Bumper.jl:

I think Bumper.jl would in most cases be preferable, and while I may have implied it allocates on the stack, it actually allocates from its own pool, which actually lives in the heap (since it uses the above library, and it indirectly uses Libc.malloc, that I don’t think you or anyone needs to know or care to use about any more even with it in Julia’s standard library, since that/those package[s] more user-friendly).

Even if Bumper uses the heap (allocates there only once, or was it once per thread?), in effect it’s as fast as if you had allocated in the stack-frame itself. It’s a package-manages stack in effect.

I think the issue that is being worked on is that the garbage collector at the moment has to freeze all threads, and then sweep through all objects in a single threaded fashion. That will be sped up by making it multi threaded, and I believe this to be different from my issue where the garbage collector doesn’t even kick in. I have tried the latest master, and can confirm it does not solve the issue.

I have modified my code as much as possible to remove more allocations (not yet using bumper, but I’ll definitely try it out). It still uses too much, but it may be possible to reach somewhat relevant system sizes and get out a paper.

Have tried to reproduce the problem. To me it seems that the GC works fine, but the culprit is the slicing of blocks, i.e., you might want to try adding views to avoid copying here:

t = @Threads.spawn @views _reduce(blocks[1:split],basesize)
toret = @views _reduce(blocks[split+1:end],basesize)
2 Likes

I tried my luck too and also optimized the initial MWE a bit:

using Statistics, Base.Threads

@info "Setting up data"
@time ms = [randn(ComplexF64,100*2,100*2) for i in 1:20000]
println("this should only take on the order $((Base.summarysize(ms[1])*3*nthreads()+Base.summarysize(ms))/(1024*1024*1024)) GB")

@info "Reducing"
@time begin

function _reduce(blocks,basesize)
  blockrng = 1:basesize:length(blocks)

  tasks = map(blockrng) do rng
    @Threads.spawn begin
      vblocks = view(blocks, rng)
      toret = zero(first(vblocks))
      buffer = zero(first(vblocks))
      for a in vblocks
          @. buffer = randn() * a
          @. buffer = randn() * buffer
          @. buffer = randn() * buffer
          @. buffer = randn() * buffer
          @. buffer = randn() * buffer
          @. buffer = randn() * buffer
          @. buffer = randn() * buffer
          @. buffer = randn() * buffer
          toret .-= mean(buffer)
      end
      return toret
    end
  end
  return fetch.(tasks)
end

sz = Int(ceil(length(ms)/nthreads()))
result = _reduce(ms,sz)
mean(result)

end

Running with julia -t 6 ./mwe.jl yields

[ Info: Setting up data
 13.155810 seconds (140.16 k allocations: 11.928 GiB, 28.92% gc time, 0.33% compilation time)
this should only take on the order 11.932552568614483 GB
[ Info: Reducing
  0.926700 seconds (1.37 M allocations: 97.557 MiB, 8.19% gc time, 320.72% compilation time)
julia> versioninfo()
Julia Version 1.9.1
Commit 147bdf428cd (2023-06-07 08:27 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: 8 × 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-14.0.6 (ORCJIT, tigerlake)
  Threads: 1 on 8 virtual cores
Environment:
  LD_PRELOAD = /usr/lib64/libstdc++.so.6

EDIT: I had a typo in the threaded loop where then every thread looped over all blocks. Fixing that made the Reducing time go down from ~50s to ~1s.

Have you considered multi-processing instead of multi-threading?

Why would that help? Multi-processing requires copying of data where threads can just share them …

Several processes can share a SharedArray. And perhaps multiprocessing is less vulnerable to the sort of mysteriously poor memory handling that the OP describes.

1 Like

This is not at all a GC issue.

The code just finished in a few seconds on my 16GB RAM laptop with ntreads = 12 (with browser running with 20+ tabs open). It made a dent in the swap memory but nothing dangerous (without profiling, the big memory hit was related to the allocation of the big ms from the beginning).

The details can be discussed later (lots of tasks fired up that allocate memory from the beginning and are processed in parallel - the GC cannot free the memory that is still in scope and used).

Just replace your @show mean(_reduce(ms,length(ms)/nthreads())) with the following:

v = @view ms[1:end]
@show mean(_reduce(v,length(ms)/nthreads()))

There is still room for improvement related to reducing memory per task - but again, this is not a GC issue, and no amount of GC.gc() calls will save you.

Later edit: I think the OP is correct when saying that @view is not doing big stuff here - however, something is happening - because the @view solution works on my end without OOM event, while the original is OOM killing my machine.

No, I was always under the impression that this is slow when the tasks require communication (I need to solve an eigenvalue problem, and the processes will have to synchronize after every evaluation of the linear map), and requires even more memory. I might nevertheless look into that, because it would probably solve the gc issue.

have you tried this? Did nothing on my system (1.9.0, ubuntu)

but that defeats the purpose. Yes it’s possible to make the minimal example work entirely in-place (as I mentioned in my first post), and then things work just fine.

I run a piece of code which allocates memory that can directly be reclaimed by the GC. Instead julia runs out of memory. That is very much a GC issue. It’s apparantly a well known issue, even acknowledged by stefan karpinski here I didn't even know julia GC had issues. Care to elaborate? | Hacker News .

That does nothing on my end, and it also wouldn’t make sense if it did. A vector of arrays is a vector of pointers. You can copy ms as much as you want (which is what is going on in the slicing operation) without actually increasing the memory usage noticeably, as you are only copying the pointers to the data, not the underlying data.

The following piece of code gets killed after succesfully allocating ms.

using Statistics, Base.Threads

ms = [randn(ComplexF64,100*2,100*2) for i in 1:20000]

println("this should only take on the order $((Base.summarysize(ms[1])*3*nthreads()+Base.summarysize(ms))/(1024*1024*1024)) GB")

function inner!(accum,v)
    x = randn()*v;
    y = randn()*x;
    y .-= mean(y)
    accum .+= y
end

function _reduce(blocks,basesize)
    if length(blocks)<basesize
        toret = zero(blocks[1]);

        for a in blocks
            inner!(toret,a)
        end


        return toret
    else
        split = Int(ceil(length(blocks)/2));
        t = @Threads.spawn @views _reduce(blocks[1:split],basesize)
        toret = @views _reduce(blocks[split+1:end],basesize);
        toret+=fetch(t)
        return toret
    end
end

v = @views ms[1:end]
@show mean(_reduce(v,length(ms)/nthreads()))

println("done")
readline()

How curious then, that adding a GC.gc(false) call in the inner! function does actually make the code run perfectly fine.

Maybe you are using a different julia version, or maybe you are using a different OS. I’m on ubuntu julia 1.9.0, and I cannot run this mwe.

Interesting - and I also ran that on 16GB + 12 threads. Ubuntu 22.04, Julia 1.9.2.

Did you consider this approach as a low-memory impact alternative?

using BenchmarkTools, Statistics

function reduce_channel(n)
    streamer = Channel{Matrix{ComplexF64}}(100)
    reducer = Channel{Matrix{ComplexF64}}(Inf)
    # generator
    Threads.@spawn begin
        for i in 1:n
            put!(streamer,randn(ComplexF64,100*2,100*2))
        end        
    end
    # reducer
    r = Threads.@spawn begin
        toret = zeros(ComplexF64,100*2,100*2)
        i = n
        for trt in reducer
            toret += trt
            i -= 1
            i == 0 && break
        end  
        close(streamer)
        toret      
    end
    Threads.@threads for i = 1:Threads.nthreads()
        for s in streamer
            t1 = randn()*s;
            t2 = randn()*t1;
            t3 = randn()*t2;
            t4 = randn()*t3;
            t5 = randn()*t4;
            t6 = randn()*t5;
            t7 = randn()*t6;
            t8 = randn()*t7;
            t9 = randn()*t8;
            t9 .-=mean(t9)
            put!(reducer,t9)
        end 
    end 
    fetch(r)      
end

#@info "mean: " mean(reduce_channel(20000))
@time mean(reduce_channel(20000))

println("done")
readline()

# @time mean(reduce_channel(20000))
# 20.899889 seconds 
# (1.36 M allocations: 131.192 GiB, 5.26% gc time, 33.18% compilation time)

# original version (with @time): 64.424058 seconds 
# (1.25 M allocations: 119.293 GiB, 69.83% gc time, 11.68% compilation time: 7% of which was recompilation)

I hope I didn’t insert some bugs to impact the correctness.

If you don’t need to create the big ms object in advance, this keeps things steady (memory-wise). But even if you do need to have the ms in advance, this will keep your memory about the same while still getting parallelism gain.

Also - you can still create your splits if you want (in my scenario, the number of splits is equal to the length of your ms. But you can easily change that to get the best compromise between speed and memory.

I am fairly sure the OOM problem you observe can be avoid by changing the _reduce function to not spawn tasks recursively, but instead chunk the work first and then spawn tasks all at once (like I did in my previous example). Earlier you said you are willing to try this. Did it help anything?

I tested this again with the latest MWE you provided. Spawning recursively puts me at around ~30Gs when I run with 6 or 12 threads. Using the map approach from my other example above I only use ~11Gs which agrees with what the debug message predicts. And from the bit of testing I did I also conclude that the non-recursive version finishes much faster since it spawns all tasks at once and they can immediately do their work. Whereas in the recursive version task spawning is delayed quite a lot (tested by inserting a println call before @spawn).

None of this answers what causes your OOM crashes and the excessive memory use I see when running the recursive version. And whether it is a GC bug or a problem with @spawn. Or if there is some shared state/data race that we are all not seeing. (Regarding the latter there can be weird bugs with shared state due to Julia’s scoping rules, cf. Inconsistent results when using Threads.@threads in a loop - #2 by fatteneder).

EDIT: The previous answer was a reply to a wrong post and now I have to insert some more text to make the ‘similar reply’ check of discourse go away …

1 Like

This seems about to be merged, and will be backported to 1.10 (and uses idea from interesting linked paper):

I would monitor when this gets merged, and try on the subsequent master, as I think it may be relevant and keep heap size in check.

1 Like