Write Large Parquet to S3

I have an AWS Glue table made up of parquet files. Unfortunately, the upstream process creates files that are too small for optimal querying from Athena/Redshift so I am looking to consolidate them up to files closer to 300mb to 1gb. The rub is that reading all the tables and then writing them back out to a single file is blowing up my memory.

I am looking for a way to stream the row groups from each source file into a single target file without having to materialize it all at once.

My first stab at it looks like this:

TableStacker is a light-weight Table.jl wrapper that allows quick stacking of many Tables
with the same schema. 

# Constructor
TableStacker(; size_hint::Int64)
Create a new TableStaker.

## Keyword Args
`size_hint::Int64` : Allocate Vectors of `size_hint` size for this stack
struct TableStacker <: Tables.AbstractColumns
function TableStacker(; size_hint::Int64)
    new = TableStacker(Int64[], Any[])
    sizehint!(get_offsets(new), size_hint)
    sizehint!(get_tables(new), size_hint)
    return new

Base.push!(ts::TableStacker, new_table)
Add a table to the stack

function Base.push!(ts::TableStacker, new_table)
    offsets = get_offsets(ts)
    acc_length = length(offsets) == 0 ? length(new_table) : length(new_table) + last(get_offsets(ts))
    table = new_table
    push!(get_offsets(ts), acc_length)
    push!(get_tables(ts), table)

A column wrapper type for a TableStacker
struct ColumnStack{T,E}

# Accessors
get_offsets(t::TableStacker) = getfield(t, :offsets)
get_tables(t::TableStacker) = getfield(t, :tables)

# Table.jl Interface
Tables.columns(t::TableStacker) = t
Tables.schema(t::TableStacker) = Tables.schema(first(get_tables(t)))
Tables.partitions(t::TableStacker) = get_tables(t)
function Base.length(t::TableStacker)
    offsets = get_offsets(t)
    return length(offsets) == 0 ? 0 : last(offsets)
Base.getindex(t::TableStacker, name::Symbol) = Tables.getcolumn(t, name)
Tables.getcolumn(t::TableStacker, name::Symbol) = ColumnStack{typeof(t),Tables.columntype(first(get_tables(t)), name)}(t, name)
Tables.columnnames(t::TableStacker) = Tables.columnnames(first(get_tables(t)))

Base.length(c::ColumnStack) = length(c.table_stack)
Base.lastindex(c::ColumnStack) = length(c)
function Base.getindex(c::ColumnStack{<:Any,E}, i) where {E}
    offsets = get_offsets(c.table_stack)
    offset_i = searchsortedfirst(goffsets, i)
    if offset_i > length(offsets)
        BoundsError(c, i)
    tbl = c.table_stack.tables[offset_i]

    offset = offset == 1 ? 0 : c.table_stack.lenghts[i-1]
    adjusted_i = i - offset

    return getproperty(tbl, c.column_name)[adjusted_i]::E

dump_stack!(stack, fp)
Dump a TableStacker into a parquet file
function dump_stack!(stack, fp)
    @info "dumping stack" destination = fp partitions = length(get_tables(stack))
    writefile(fp, stack)
    stack = nothing
    @info "finished dump"

Basically, TableStacker collects row groups wrapped in a TypedTable and then returns them as its partitions. Parquet2 then writes those partitions out as the row groups of the new parquet file. In the process of doing that, all of the underlying row groups get materialized, and since the row groups are still referenced by the TableStacker, they can’t get GCed until the whole process is done.

I’d appreciate any new strategies you can think of!

1 Like

I don’t quite understand exactly the context. Is it that you have lots of small parquet files and want to create a single parquet file from all of them? If so, this is basically a single line in DuckDB (which has a Julia interface).

1 Like

You have the problem right. I can try with DuckDB, but my biggest problem right now is keeping memory down from constructing the target file. I had assumed that DuckDB would still materialize all the source tables like my naive code does.

I’ll report back!

If all you are doing is stream from one file to the other, I believe it should not be an issue.

DBInterface.execute(con, "INSTALL httpfs;")


ERROR: Execute of query "INSTALL httpfs;" failed: HTTP Error: Failed to download extension "httpfs" at URL "http://extensions.duckdb.org/v0.8.1/windows_amd64_mingw/httpfs.duckdb_extension.gz"
Extension "httpfs" is an existing extension.

Are you using a development build? In this case, extensions might not (yet) be uploaded.
 [1] execute(stmt::DuckDB.Stmt, params::NamedTuple{(), Tuple{}})
   @ DuckDB C:\Users\mrufsvold\.julia\packages\DuckDB\I8oJ8\src\result.jl:745
 [2] execute
   @ C:\Users\mrufsvold\.julia\packages\DuckDB\I8oJ8\src\result.jl:848 [inlined]
 [3] execute
   @ C:\Users\mrufsvold\.julia\packages\DBInterface\1Gmxx\src\DBInterface.jl:130 [inlined]
 [4] #execute#2
   @ C:\Users\mrufsvold\.julia\packages\DBInterface\1Gmxx\src\DBInterface.jl:152 [inlined]
 [5] execute(conn::DuckDB.DB, sql::String)
   @ DBInterface C:\Users\mrufsvold\.julia\packages\DBInterface\1Gmxx\src\DBInterface.jl:152
 [6] top-level scope
   @ c:\Users\mrufsvold\Projects\DIL-price-transparency-scraper\TableCompressor.jl\DuckDBAttempt.jl:19

I opened an issue with DuckDB, but maybe you have a theory?

Yikes. Was a problem before too

1 Like

If the Internet brings someone here in the future, there is also a writeiterable function in Parquet2 that is not in the docs, but has a docstring that indicates it is stable and public. This is a very good solution for streaming row groups.