I have an AWS Glue table made up of parquet files. Unfortunately, the upstream process creates files that are too small for optimal querying from Athena/Redshift so I am looking to consolidate them up to files closer to 300mb to 1gb. The rub is that reading all the tables and then writing them back out to a single file is blowing up my memory.
I am looking for a way to stream the row groups from each source file into a single target file without having to materialize it all at once.
My first stab at it looks like this:
TableStacker is a light-weight Table.jl wrapper that allows quick stacking of many Tables
with the same schema.
# Constructor
TableStacker(; size_hint::Int64)
Create a new TableStaker.
## Keyword Args
`size_hint::Int64` : Allocate Vectors of `size_hint` size for this stack
struct TableStacker <: Tables.AbstractColumns
function TableStacker(; size_hint::Int64)
new = TableStacker(Int64[], Any[])
sizehint!(get_offsets(new), size_hint)
sizehint!(get_tables(new), size_hint)
return new
Base.push!(ts::TableStacker, new_table)
Add a table to the stack
function Base.push!(ts::TableStacker, new_table)
offsets = get_offsets(ts)
acc_length = length(offsets) == 0 ? length(new_table) : length(new_table) + last(get_offsets(ts))
table = new_table
push!(get_offsets(ts), acc_length)
push!(get_tables(ts), table)
A column wrapper type for a TableStacker
struct ColumnStack{T,E}
# Accessors
get_offsets(t::TableStacker) = getfield(t, :offsets)
get_tables(t::TableStacker) = getfield(t, :tables)
# Table.jl Interface
Tables.columns(t::TableStacker) = t
Tables.schema(t::TableStacker) = Tables.schema(first(get_tables(t)))
Tables.partitions(t::TableStacker) = get_tables(t)
function Base.length(t::TableStacker)
offsets = get_offsets(t)
return length(offsets) == 0 ? 0 : last(offsets)
Base.getindex(t::TableStacker, name::Symbol) = Tables.getcolumn(t, name)
Tables.getcolumn(t::TableStacker, name::Symbol) = ColumnStack{typeof(t),Tables.columntype(first(get_tables(t)), name)}(t, name)
Tables.columnnames(t::TableStacker) = Tables.columnnames(first(get_tables(t)))
Base.length(c::ColumnStack) = length(c.table_stack)
Base.lastindex(c::ColumnStack) = length(c)
function Base.getindex(c::ColumnStack{<:Any,E}, i) where {E}
offsets = get_offsets(c.table_stack)
offset_i = searchsortedfirst(goffsets, i)
if offset_i > length(offsets)
BoundsError(c, i)
tbl = c.table_stack.tables[offset_i]
offset = offset == 1 ? 0 : c.table_stack.lenghts[i-1]
adjusted_i = i - offset
return getproperty(tbl, c.column_name)[adjusted_i]::E
dump_stack!(stack, fp)
Dump a TableStacker into a parquet file
function dump_stack!(stack, fp)
@info "dumping stack" destination = fp partitions = length(get_tables(stack))
writefile(fp, stack)
stack = nothing
@info "finished dump"
Basically, TableStacker collects row groups wrapped in a TypedTable and then returns them as its partitions. Parquet2 then writes those partitions out as the row groups of the new parquet file. In the process of doing that, all of the underlying row groups get materialized, and since the row groups are still referenced by the TableStacker, they can’t get GCed until the whole process is done.
I’d appreciate any new strategies you can think of!