I am using this function to save a batch Arrow files into a single Arrow file with record batches:
function combinebatches(path::String, file::String, batches)
files = Array{String,1}()
for i = 1:batches
push!(files,joinpath(path, "$(file)$(i).arrow"))
end
arrow_parts = Tables.partitioner(Arrow.Table, files)
open(joinpath(path, "$(file)_batched.arrow"), "w") do io
Arrow.write(io, arrow_parts, compress=:zstd)
end
#delete chunks
for i = 1:batches
rm(joinpath(path, "$(file)$(i).arrow"))
end
return nothing
end #combinebatches
When saving about 25 uncompressed arrow files with an average size of 1.8 GB each, required a RAM size of around 50GB. As I am partitioning my data to save RAM, this is creating a problem with the high RAM usage (in this case > 50GB). I was expecting that one would not require more than the maximum size of each single file (or perhaps number of threads x file size) to save the record batch.
This is certainly unexpected as you’ve described things. A few things that come to mind that we should look into:
Although we’ve made some efforts to ensure writing an Arrow.Table out avoids making copies, it’s possible we’re missing something here and things are getting unnecessarily converted or materialized
We’ve seen a potentially related issue in CSV.jl where large chunks of memory seem to get “stuck” and not reclaimed by the GC. That bug is still under investigation.
That would be great, although I am processing close to a billion records in my DataFrames-based workflow (longitudinal population studies - a record for each day in the life of some 150 000 individuals), I have managed to keep my RAM usage generally below 64GB using the great partition features of Arrow, coupled with DataFrame’s amazing performance, but reducing the time from 8.5 days using a Java-based workflow to 1 day using Julia.
Happy to know that I am not doing something stupid in my code, also happy to test any change on my workflow if that would help.