I have an AWS Glue table made up of parquet files. Unfortunately, the upstream process creates files that are too small for optimal querying from Athena/Redshift so I am looking to consolidate them up to files closer to 300mb to 1gb. The rub is that reading all the tables and then writing them back out to a single file is blowing up my memory.
I am looking for a way to stream the row groups from each source file into a single target file without having to materialize it all at once.
My first stab at it looks like this:
"""
TableStacker is a light-weight Table.jl wrapper that allows quick stacking of many Tables
with the same schema.
# Constructor
TableStacker(; size_hint::Int64)
Create a new TableStaker.
## Keyword Args
`size_hint::Int64` : Allocate Vectors of `size_hint` size for this stack
"""
struct TableStacker <: Tables.AbstractColumns
offsets::Vector{Int64}
tables::Vector
end
function TableStacker(; size_hint::Int64)
new = TableStacker(Int64[], Any[])
sizehint!(get_offsets(new), size_hint)
sizehint!(get_tables(new), size_hint)
return new
end
"""
Base.push!(ts::TableStacker, new_table)
Add a table to the stack
"""
function Base.push!(ts::TableStacker, new_table)
offsets = get_offsets(ts)
acc_length = length(offsets) == 0 ? length(new_table) : length(new_table) + last(get_offsets(ts))
table = new_table
push!(get_offsets(ts), acc_length)
push!(get_tables(ts), table)
end
"""
A column wrapper type for a TableStacker
"""
struct ColumnStack{T,E}
table_stack::T
column_name::Symbol
end
# Accessors
get_offsets(t::TableStacker) = getfield(t, :offsets)
get_tables(t::TableStacker) = getfield(t, :tables)
# Table.jl Interface
Tables.columns(t::TableStacker) = t
Tables.schema(t::TableStacker) = Tables.schema(first(get_tables(t)))
Tables.partitions(t::TableStacker) = get_tables(t)
function Base.length(t::TableStacker)
offsets = get_offsets(t)
return length(offsets) == 0 ? 0 : last(offsets)
end
Base.getindex(t::TableStacker, name::Symbol) = Tables.getcolumn(t, name)
Tables.getcolumn(t::TableStacker, name::Symbol) = ColumnStack{typeof(t),Tables.columntype(first(get_tables(t)), name)}(t, name)
Tables.columnnames(t::TableStacker) = Tables.columnnames(first(get_tables(t)))
Base.length(c::ColumnStack) = length(c.table_stack)
Base.lastindex(c::ColumnStack) = length(c)
function Base.getindex(c::ColumnStack{<:Any,E}, i) where {E}
offsets = get_offsets(c.table_stack)
offset_i = searchsortedfirst(goffsets, i)
if offset_i > length(offsets)
BoundsError(c, i)
end
tbl = c.table_stack.tables[offset_i]
offset = offset == 1 ? 0 : c.table_stack.lenghts[i-1]
adjusted_i = i - offset
return getproperty(tbl, c.column_name)[adjusted_i]::E
end
"""
dump_stack!(stack, fp)
Dump a TableStacker into a parquet file
"""
function dump_stack!(stack, fp)
@info "dumping stack" destination = fp partitions = length(get_tables(stack))
writefile(fp, stack)
stack = nothing
GC.gc()
@info "finished dump"
end
Basically, TableStacker collects row groups wrapped in a TypedTable and then returns them as its partitions. Parquet2 then writes those partitions out as the row groups of the new parquet file. In the process of doing that, all of the underlying row groups get materialized, and since the row groups are still referenced by the TableStacker, they can’t get GCed until the whole process is done.
I’d appreciate any new strategies you can think of!