Write Large Parquet to S3

I have an AWS Glue table made up of parquet files. Unfortunately, the upstream process creates files that are too small for optimal querying from Athena/Redshift so I am looking to consolidate them up to files closer to 300mb to 1gb. The rub is that reading all the tables and then writing them back out to a single file is blowing up my memory.

I am looking for a way to stream the row groups from each source file into a single target file without having to materialize it all at once.

My first stab at it looks like this:

"""
TableStacker is a light-weight Table.jl wrapper that allows quick stacking of many Tables
with the same schema. 

# Constructor
TableStacker(; size_hint::Int64)
Create a new TableStaker.

## Keyword Args
`size_hint::Int64` : Allocate Vectors of `size_hint` size for this stack
"""
struct TableStacker <: Tables.AbstractColumns
    offsets::Vector{Int64}
    tables::Vector
end
function TableStacker(; size_hint::Int64)
    new = TableStacker(Int64[], Any[])
    sizehint!(get_offsets(new), size_hint)
    sizehint!(get_tables(new), size_hint)
    return new
end

"""
Base.push!(ts::TableStacker, new_table)
Add a table to the stack

"""
function Base.push!(ts::TableStacker, new_table)
    offsets = get_offsets(ts)
    acc_length = length(offsets) == 0 ? length(new_table) : length(new_table) + last(get_offsets(ts))
    table = new_table
    push!(get_offsets(ts), acc_length)
    push!(get_tables(ts), table)
end

"""
A column wrapper type for a TableStacker
"""
struct ColumnStack{T,E}
    table_stack::T
    column_name::Symbol
end

# Accessors
get_offsets(t::TableStacker) = getfield(t, :offsets)
get_tables(t::TableStacker) = getfield(t, :tables)

# Table.jl Interface
Tables.columns(t::TableStacker) = t
Tables.schema(t::TableStacker) = Tables.schema(first(get_tables(t)))
Tables.partitions(t::TableStacker) = get_tables(t)
function Base.length(t::TableStacker)
    offsets = get_offsets(t)
    return length(offsets) == 0 ? 0 : last(offsets)
end
Base.getindex(t::TableStacker, name::Symbol) = Tables.getcolumn(t, name)
Tables.getcolumn(t::TableStacker, name::Symbol) = ColumnStack{typeof(t),Tables.columntype(first(get_tables(t)), name)}(t, name)
Tables.columnnames(t::TableStacker) = Tables.columnnames(first(get_tables(t)))

Base.length(c::ColumnStack) = length(c.table_stack)
Base.lastindex(c::ColumnStack) = length(c)
function Base.getindex(c::ColumnStack{<:Any,E}, i) where {E}
    offsets = get_offsets(c.table_stack)
    offset_i = searchsortedfirst(goffsets, i)
    if offset_i > length(offsets)
        BoundsError(c, i)
    end
    tbl = c.table_stack.tables[offset_i]

    offset = offset == 1 ? 0 : c.table_stack.lenghts[i-1]
    adjusted_i = i - offset

    return getproperty(tbl, c.column_name)[adjusted_i]::E
end

"""
dump_stack!(stack, fp)
Dump a TableStacker into a parquet file
"""
function dump_stack!(stack, fp)
    @info "dumping stack" destination = fp partitions = length(get_tables(stack))
    writefile(fp, stack)
    stack = nothing
    GC.gc()
    @info "finished dump"
end

Basically, TableStacker collects row groups wrapped in a TypedTable and then returns them as its partitions. Parquet2 then writes those partitions out as the row groups of the new parquet file. In the process of doing that, all of the underlying row groups get materialized, and since the row groups are still referenced by the TableStacker, they can’t get GCed until the whole process is done.

I’d appreciate any new strategies you can think of!

1 Like

I don’t quite understand exactly the context. Is it that you have lots of small parquet files and want to create a single parquet file from all of them? If so, this is basically a single line in DuckDB (which has a Julia interface).

1 Like

You have the problem right. I can try with DuckDB, but my biggest problem right now is keeping memory down from constructing the target file. I had assumed that DuckDB would still materialize all the source tables like my naive code does.

I’ll report back!

If all you are doing is stream from one file to the other, I believe it should not be an issue.

DBInterface.execute(con, "INSTALL httpfs;")

Yields:

ERROR: Execute of query "INSTALL httpfs;" failed: HTTP Error: Failed to download extension "httpfs" at URL "http://extensions.duckdb.org/v0.8.1/windows_amd64_mingw/httpfs.duckdb_extension.gz"
Extension "httpfs" is an existing extension.

Are you using a development build? In this case, extensions might not (yet) be uploaded.
Stacktrace:
 [1] execute(stmt::DuckDB.Stmt, params::NamedTuple{(), Tuple{}})
   @ DuckDB C:\Users\mrufsvold\.julia\packages\DuckDB\I8oJ8\src\result.jl:745
 [2] execute
   @ C:\Users\mrufsvold\.julia\packages\DuckDB\I8oJ8\src\result.jl:848 [inlined]
 [3] execute
   @ C:\Users\mrufsvold\.julia\packages\DBInterface\1Gmxx\src\DBInterface.jl:130 [inlined]
 [4] #execute#2
   @ C:\Users\mrufsvold\.julia\packages\DBInterface\1Gmxx\src\DBInterface.jl:152 [inlined]
 [5] execute(conn::DuckDB.DB, sql::String)
   @ DBInterface C:\Users\mrufsvold\.julia\packages\DBInterface\1Gmxx\src\DBInterface.jl:152
 [6] top-level scope
   @ c:\Users\mrufsvold\Projects\DIL-price-transparency-scraper\TableCompressor.jl\DuckDBAttempt.jl:19

I opened an issue with DuckDB, but maybe you have a theory?

Yikes. Was a problem before too

1 Like

If the Internet brings someone here in the future, there is also a writeiterable function in Parquet2 that is not in the docs, but has a docstring that indicates it is stable and public. This is a very good solution for streaming row groups.