Memory leak in allocation-intensive multi-threaded code

I’m trying to run multiple threads to construct a random matrix that satisfies some constraints. The goal is to have them run in parallel, and when one task finds the matrix return it and stop running tasks.

However, I run into a pickle when I try to implement this, since the function that tries constructs the matrix does a bunch of linear algebra and it seems that this is causing my memory to explode. I’ve been able to reproduce the issue with the below MWE. Ironically, if I use a logger to print progress, the memory remains in check. I’m guessing all these threads running busy are blocking the GC.

using Logging
s = 100_000_000
logger = ConsoleLogger(stderr, Logging.Info);
global_logger(logger);

function construct_matrix!(channel)
    X = rand(1_000_000, 10)
    result = X' * X 
    u = rand()
    if u < 1^-6
        put!(channel, result)
    end
end

function get_matrix(s)
    channel = Channel{Matrix}(1)
    dec = Threads.Atomic{Int}(s)
    function new_task()
        construct_matrix!(channel)
        Threads.atomic_sub!(dec, 1)
    end
    for i in 1:s
        if i % 100_000 == 0                                                                           
            @debug "$i threads launched"
        end
        if isready(channel)
            return fetch(channel)
        end
        Threads.@spawn new_task()
    end 
    while dec[] > 0 
        if isready(channel)
            return fetch(channel)
        end
    end
    return Nothing 
end

get_matrix(s)

If I set logger = ConsoleLogger(stderr, Logging.Debug) the memory remains somewhat in check and the function returns relatively quickly. Whereas in the code above it crashes before getting the chance to return.

X is about 80MiB in memory, so while I’m running with 8 threads, each construct_matrix! call should need to allocate 80 MiB so I would expect to have less than 1GiB occupied by running these tasks in parallel.

Also, as far as I understand, since channel only fits one matrix, it would block all other tasks from proceeding as soon as a single matrix is found, so I shouldn’t have a situation where thousands of tasks have 10x10 matrices waiting to be put.

Any advice would be greatly appreciated as a parallel implementation would speed up by code by quite a lot. I don’t want to use Distributed since these calls would be running from workers themselves and I just don’t want workers calling on workers.

You can always try to call the garbage collector manually, e.g.

GC.gc(full=false)

or

GC.gc(full=true)

and see it it helps…

By the way, how many threads are you launching? Most of the time it does not make sense to launch
more threads than there are cores…

I’m only launching 8 threads, the amount of physical cores I have. I

f I place the calls to gc inside construct_matrix! nothing really happens. If I put it in the for and while loops of get_matrix, it really slows everything down to a crawl. I even notice that my code seems to run one thread at a time… I’m wondering if GC.safepoint() could help me out could help me out.

Ironically, even with the calls to gc in my code, the code still runs fast and fine if I set logger = ConsoleLogger(stderr, Logging.Debug).

I’m trying to understand what get_matrix is supposed to do, particularly the if isready(channel)... block. In particular, fetch might not do what you expect. The docstring says

fetch(c::Channel)

  Waits for and returns (without removing) the first available item from the Channel. Note: fetch is unsupported on an unbuffered (0-size)
  Channel.

You probably want take! instead (note the exclamation mark which is a convention for when the argument is modified).

I added some logging to see what’s going on. This seems to work, expect that the while loop in the end will of course not terminate.

using Logging
s = 100_000_000
logger = ConsoleLogger(stderr, Logging.Info);
global_logger(logger);

function construct_matrix!(channel)
    X = rand(1_000_000, 10)
    result = X' * X
    @info Threads.threadid() result
    u = rand()
    if u < 1^-6
        put!(channel, result)
    end
end

function get_matrix(s)
    channel = Channel{Matrix}(1)
    dec = Threads.Atomic{Int}(s)
    function new_task()
        construct_matrix!(channel)
        Threads.atomic_sub!(dec, 1)
    end
    for i in 1:s
        if i % 100_000 == 0                                                                           
            @debug "$i threads launched"
        end
        if isready(channel)
            @info sum(take!(channel))
        else
            @info "Channel is empty"
        end
        @info "Spawning new_task"
        Threads.@spawn new_task()
    end 
    while dec[] > 0 
        if isready(channel)
            @info sum(take!(channel))
        end
    end
    return nothing 
end

get_matrix(s)

The reason I’m using fetch instead of take! is that I don’t want to proceed with the tasks when the channel has one matrix in it. If I use take!, it would empty the channel, allowing other tasks to keep putting.

Also, I’m not sure why the while loop wouldn’t terminate. new_task should decrement dec until it reaches zero if no matrices were found.

The tricky thing is that if you remove the logging, even this version ends up with exploding memory.

EDIT: actually a lot of tasks seem to be spawned before the first result is being put into the channel.

My suggestion would be that you make it explicit where the code should wait on the channel. As it is, it seems to be left to the scheduler completely, and that might explain why allocations can grow (if many tasks are waiting to put something in the channel, they will still need to memorize the matrix). In fact, if I’m reasoning correctly about your code, all tasks could have been spawned before anyone put its result into the channel, therefore terminating the loop. Now dec[] == 0 and there is nobody to fetch from the channel. Now if the first task puts something into the channel, s-1 tasks are now allocating memory and waiting forever to put their result as well.

If you tell us what you want to accomplish, we might be able come up with a better code to do that.

What I’m trying to accomplish is an efficient way to call a function like construct_matrix! that has a small probability of actually completing the task, on the order of 10^-6. I can run them sequentially, but it would be better to run a threaded algorithm which runs as many tasks as possible in parallel. As soon as one task finds a matrix, stop the other tasks and return that single matrix.

I thought about doing something like running 8 tasks at a time, checking if they succeeded, then launching 8 separate tasks, and this works. But I fear I lose some efficiency because I would like to launch a new task immediately after one has completed as long as no matrix was already found.

My idea with the threaded algorithm was that the scheduler would keep launching a new task as soon as a thread is free. When a matrix is found, the channel becomes full so that the tasks wait on the channel. I thought that only 8 tasks maximum would be waiting at any given moment to put on the channel, since that is the number of threads I have, so I expected the total memory usage to be negligible. Indeed it is if I run the logger.

dec[] should only be zero if all the tasks are done, since it decrements after running construct_matrix!, which should mean that no matrix was found after s trials. The only reason that part of the code is there is because I don’t want it to run indefinitely.

Now if the first task puts something into the channel, s-1 tasks are now allocating memory and waiting forever to put their result as well.

I only have 8 threads running (i.e. I run julia -t 8), so I would have thought that there would be only 8 tasks allocating memory, while the other tasks are waiting to be spawned. Again, if the channel is full, I am done and do not want to run any new threads.

Thanks for the explanation. It’s important to note that having 8 threads does not mean that you can only have 8 tasks. You can have code that spawns an infinite number of task, each allocating memory and then waiting. So your threads keep on switching to new tasks until you get an OOM and Julia is killed by the OS.

I’ll try to get back later to try to propose a solution.

I would probably @spawn N worker tasks (where N is the number of cores) and one “work generator” task. Each worker task would take! from a “work” channel some description of the matrix it tries to generate, and then when finished it would put! to a results channel either a request for a new task, or if it found one would put! the matrix to the “results” channel.

The work generator task would put! N descriptions of work in the work channel, then take! from the results. If it takes a request for more work, it put! more work, otherwise if it got the result it wanted, it kills the other tasks and proceeds.

does that makes sense?