Hello everyone,
I’m working on an algorithm that computes 8 corpus linguistic measures that capture association between words (for short) and then merges the words that score highest in each iteration. I’m writing the algorithm as a package (I can share the repo if needed). I’m fairly new to Julia and coming from R/Python it took me quite a while to optimize my code. I managed to get to the speed I was hoping for, using distributed computing and mainly DataFrames. However, I’m experiencing a severe memory leak issue. The code is supposed to run for thousands of iterations but my RAM (39 GB) gets saturated after 5-6 iterations and VScode crashes. I made sure to overwrite the “big” variables (that are in the global scope). I also tried triggering the GC manually which seems to marginally reduce the memory leak. The code is rather complex and I’m not sure I can produce an MWE, but these are the main functions:
using Pkg, Distributed; addprocs(12);
@everywhere using Pkg, Distributed;
@everywhere Pkg.activate(".");
using mMerge; @everywhere using mMerge;
using DataFrames; @everywhere using DataFrames;
using SharedArrays; @everywhere using SharedArrays;
using Chain;
using CSV;
# Get stats
function get_stats(df::DataFrame, freq_threshold::Int64)
# Add frequency using the provided frequency threshold
df, freqs_dict = add_freqs(df, freq_threshold)
# Get proportions
grouped_props, corpus_props = get_proportions(df)
hdp = get_hdp(corpus_props)
df = unique(select(df, Between(:W1, :FREQb)))
cands = Tuple.(eachrow(df[:, Not(:FREQ, :FREQa, :FREQb)]))
dp = SharedArray{Float64}(length(cands))
ent1 = SharedArray{Float64}(length(cands))
ent2 = SharedArray{Float64}(length(cands))
assoc1 = SharedArray{Float64}(length(cands))
assoc2 = SharedArray{Float64}(length(cands))
@sync @distributed for i in eachindex(cands)
dp[i] = get_dp(freqs_dict, corpus_props, grouped_props, cands[i][3], hdp)
ent1[i] = get_slot_entropy(df, cands[i][1], cands[i][2], 1)
ent2[i] = get_slot_entropy(df, cands[i][1], cands[i][2], 2)
assoc1[i], assoc2[i] = get_association(df, cands[i][1], cands[i][2])
end
# Normalize if needed and add to the DF
df.FREQ = normalize(log10.(df.FREQ))
df.FREQa = 1 .- normalize(log10.(df.FREQa))
df.FREQb = 1 .- normalize(log10.(df.FREQb))
df.DP = dp # normalized within the function
df.ENT1 = normalize(ent1)
df.ENT2 = normalize(ent2)
df.ASSOC1 = assoc1
df.ASSOC2 = assoc2
# Eucledian distance
@chain df begin
transform(Not(:MWEidx, :W1, :W2) .=> ByRow(x -> x .^ 2), renamecols=false)
transform(AsTable(Not(:MWEidx, :W1, :W2)) .=> ByRow(x -> sqrt(sum(x))) => :EUCL)
df.EUCL = _.EUCL
end
return sort(df, :EUCL, rev=true)
end
# merge_cropus!
function merge_corpus!(corpus::DataFrame, mwe::Tuple{Int64, Int64})
word1, word2 = mwe
cand_idxes = findall(corpus.WORDidx .== word1 )
for idx in cand_idxes
# Check if next row contains word2
if corpus[idx + 1,:].WORDidx == word2
new_mwe = corpus[idx, :].WORD * "_" * corpus[idx + 1, :].WORD
# Replace word
corpus[idx,:].WORD = new_mwe
# Delete next row
deleteat!(corpus, idx + 1)
# Shift indexes
cand_idxes .-= 1
end
end # for loop
return corpus
end
# multi_merge
function multi_merge(corpus_path::String,
iterations::Int64, freq_threshold::Int64)
# Read the corpus as a DataFrame
corpus = CSV.read(corpus_path,
DataFrame,
ntasks=1) # avoids using more than 1 thread
# initialize the DataFrame to have global access later
df = DataFrame()
for i in 1:iterations
corpus, df = process_corpus(corpus)
df = get_stats(df, freq_threshold)
mwe = (df[1,1], df[1,2])
merge_corpus!(corpus, mwe)
GC.gc(true)
# ccall(:malloc_trim, Int32, (Int32,), 0)
end
return corpus, df
end
I hope this is not too much code, the initial corpus has a million words, i.e. the corpus DF has ~1 million rows. I’m using Julia 1.9.3 on KUbuntu 22.04.