Memory leak with a distributed code

Hello everyone,

I’m working on an algorithm that computes 8 corpus linguistic measures that capture association between words (for short) and then merges the words that score highest in each iteration. I’m writing the algorithm as a package (I can share the repo if needed). I’m fairly new to Julia and coming from R/Python it took me quite a while to optimize my code. I managed to get to the speed I was hoping for, using distributed computing and mainly DataFrames. However, I’m experiencing a severe memory leak issue. The code is supposed to run for thousands of iterations but my RAM (39 GB) gets saturated after 5-6 iterations and VScode crashes. I made sure to overwrite the “big” variables (that are in the global scope). I also tried triggering the GC manually which seems to marginally reduce the memory leak. The code is rather complex and I’m not sure I can produce an MWE, but these are the main functions:

using Pkg, Distributed; addprocs(12);
@everywhere using Pkg, Distributed;
@everywhere Pkg.activate(".");
using mMerge; @everywhere using mMerge;
using DataFrames; @everywhere using DataFrames;
using SharedArrays; @everywhere using SharedArrays;
using Chain;
using CSV;

# Get stats
function get_stats(df::DataFrame, freq_threshold::Int64)

    # Add frequency using the provided frequency threshold
        df, freqs_dict = add_freqs(df, freq_threshold)
        # Get proportions
        grouped_props, corpus_props = get_proportions(df)
    
        hdp = get_hdp(corpus_props)
        df = unique(select(df, Between(:W1, :FREQb)))
        cands = Tuple.(eachrow(df[:, Not(:FREQ, :FREQa, :FREQb)]))
        dp = SharedArray{Float64}(length(cands))
        ent1 = SharedArray{Float64}(length(cands))
        ent2 = SharedArray{Float64}(length(cands))
        assoc1 = SharedArray{Float64}(length(cands))
        assoc2 = SharedArray{Float64}(length(cands))
    
    
        @sync @distributed for i in eachindex(cands)
            dp[i] = get_dp(freqs_dict, corpus_props, grouped_props, cands[i][3], hdp)
            ent1[i] = get_slot_entropy(df, cands[i][1], cands[i][2], 1)
            ent2[i] = get_slot_entropy(df, cands[i][1], cands[i][2], 2)
            assoc1[i], assoc2[i] = get_association(df, cands[i][1], cands[i][2])
        end
   
        # Normalize if needed and add to the DF
        df.FREQ = normalize(log10.(df.FREQ))
        df.FREQa = 1 .- normalize(log10.(df.FREQa))
        df.FREQb = 1 .- normalize(log10.(df.FREQb))
        df.DP = dp # normalized within the function
        df.ENT1 = normalize(ent1)
        df.ENT2 = normalize(ent2)
        df.ASSOC1 = assoc1
        df.ASSOC2 = assoc2
    
        # Eucledian distance
        @chain df begin
            transform(Not(:MWEidx, :W1, :W2) .=> ByRow(x -> x .^ 2), renamecols=false)
            transform(AsTable(Not(:MWEidx, :W1, :W2)) .=> ByRow(x -> sqrt(sum(x))) => :EUCL)
            df.EUCL = _.EUCL
        end
    
        return sort(df, :EUCL, rev=true)

    end
    
    
    # merge_cropus!
    function merge_corpus!(corpus::DataFrame, mwe::Tuple{Int64, Int64})
        word1, word2 = mwe
        cand_idxes = findall(corpus.WORDidx .== word1 )
    
        for idx in cand_idxes
    
            # Check if next row contains word2
            if corpus[idx + 1,:].WORDidx == word2 
                new_mwe = corpus[idx, :].WORD * "_" * corpus[idx + 1, :].WORD
                
                # Replace word
                corpus[idx,:].WORD = new_mwe
                # Delete next row
                deleteat!(corpus, idx + 1)
                # Shift indexes
                cand_idxes .-= 1
            end
        end # for loop
    
        return corpus
    
    end
    
    # multi_merge
    function multi_merge(corpus_path::String,
                                     iterations::Int64, freq_threshold::Int64)
        # Read the corpus as a DataFrame
        corpus = CSV.read(corpus_path, 
        DataFrame, 
        ntasks=1) # avoids using more than 1 thread
    
        # initialize the DataFrame to have global access later
        df = DataFrame()
        for i in 1:iterations
            corpus, df = process_corpus(corpus)
            df = get_stats(df, freq_threshold)
            mwe = (df[1,1], df[1,2])
            merge_corpus!(corpus, mwe)
            GC.gc(true)
            # ccall(:malloc_trim, Int32, (Int32,), 0)
        end  
    
        return corpus, df
    
    end

I hope this is not too much code, the initial corpus has a million words, i.e. the corpus DF has ~1 million rows. I’m using Julia 1.9.3 on KUbuntu 22.04.

Fixed by using Julia 1.10.0-beta2 which fixes the GC. Looking forward to the release.

7 Likes

I can confirm similar behaviour on a @remotecall_wait process with an outer serial loop and an inner parallel loop which goes away when updating Julia from v1.9.2 to 1.10.0-beta3.

My construct is

@sync for SOMETHING
   @remotecall_wait operate on s in SOMETHING
1 Like