I have a pipeline that concatenates parquet tables from S3 and writes them back out at the end. It goes something like:
- Read in the metadata for a bunch of parquet files in a partition
- Get their RowGroups (the unit of storage in a parquet file)
- Concatenate the RowGroups into larger tables
- Write out the larger tables as RowGroups in new, consolidated parquet files
Detailed Summary of the Pipeline
- Start with a
Channelwhich produces partition prefixes (“directories” containing the parquet files
- For each partition, get a list of files
- Chunk list into batches of a certain number of MB
- For each batch of files, get the metadata for all the RowGroups (this is the unit of storage inside a parquet file)
- Sort this batch of RowGroups based on some criteria
- Bin these row groups into much larger RowGroups (this is the important step which makes querying the output faster than the original files)
- For each bin, read in the RowGroups vertically concatenate them into a new table
- Bin this stream of tables into groups of 3 to be written out to parquet files
- For each bin of tables, write to an IOBuffer
- For each IOBuffer, write back to S3
Steps 1 and 2 are relatively light-weight, both in terms of memory and time. However, storing the file buffers and such for the RowGroups does take a non-trivial amount of memory.
Step 3 requires me to actually read whole tables in, manipulate them, and write them out to large IOBuffers. This is where the bulk of the compute and memory go.
Step 4 is fairly trivial. It does take some time though as it is writing out 200-500MB per file.
I tried to naively use Transducers.jl (just a bunch of
MapCat), but it was too aggressive, and I ran out of memory. I couldn’t figure out how to ask it to stop processing upstream stuff to wait for downstream to finish.
I am currently using a bunch of Channels and
Threads.foreach with limits on the number of threads available to each step. But the limits I’ve picked are purely by intuition. I have no idea if some processes are starved for input or backed up waiting for downstream stuff to open up.
Ideally, I’d be able to use just enough resources to ensure step 3 always has an input available, and dynamically ramp up the number of instances of step 3 workers so that I soak up as much of the resources as possible.
Keep everything single threaded except Step 3.
- Leave max compute and memory available to the biggest task
- Start up is slow because each worker will need to wait for steps 1 and 2 to complete sequentially
- Could still potentially run out of memory if too many workers are allocated to step 3
Run the whole pipeline sequentially but run it multiple times in parallel on separate partitions.
- No waiting
- There will be periods where all workers on simple tasks and resources are underutilized, and periods where they are all working on a hard task and resources are overutilized.
What strategies would you apply to this kind of problem where a process has large swings in resource consumption?
Am I missing something with Transducers? It seems like it should be The Way to have the system automatically manage parallel pipelines.