Repartitioning 2TB of csv into parquets

Good morning from Colombia.

I’m beginner in big data/data science, and i’m trying to do the next task:

We have 2 TB of CSV from one table. We want to try to use a SQL Layer to query that data. 
Currently, the data is stored in Timescale, but Timescale doesn't compress data and the used SSD 
space is growing in a fast pace. So, I and a partner are trying to use Azure Data Lake, with a SQL 
Layer over that, to test if: the performance of queryng is acceptable? the price is better or worse?. 
The two SQL layer that we want to try are Dremio and Azure Data Flow Analytics. But the problem is 
csv are sometime very very very large (100 GB) and sometimes very very tiny (10KB). We want to 
repartition the data first and to write the data to parquet second.

To do the task, we tryied:

1) To use Pandas, in a very large machine (64 cores, more than 450 GB of RAM). The problem was 
   that Pandas doesn't scale to large machines.
2) To use Azure Data Factory Data Flow, the data flow cluster (Spark cluster really) crash with a 
   System Error (?). So, aborted.

Finally, the csv are internally sorted by the key we want to use as partition key. So, maybe, we can use Julia to read the csv in streaming, and to write the partitioned data to parquets. Is that a good idea? you can see problems in that aproximation?

P.D.: Sorry my English. I hope you can understand the problem.

1 Like

Depends on how you want to partition it. Into multiple files by columns or rows. Take a looked at CSV.jl. There are ways of reading parts of a file in at a time

1 Like

I want to partition the csv by rows. So, is possible to write parquets in Julia?

Using the CSV.jl package, you can use the CSV.Rows(file) structure to efficiently iterate rows; it shouldn’t matter how big the file is, CSV.Rows can efficiently handle iterating rows.

Unfortunately, the Parquet.jl package doesn’t support writing parquet files yet. The Feather.jl package, however, is able to write parquet-like binary files that can be very efficient to re-read, so you might check that out if it might work for you.

Hopefully that helps a little.

4 Likes

This seems like a reasonable[1] approach, though. Can you elaborate in which way Pandas “doesn’t scale”?

[1] Yes, there’s probably a smarter/cheaper way, but if this is a one-off, your time is probably more expensive than the hardware.

In my use case:

  1. When i’m reading CSV, the reading is not using multiple cores; same when i’m grouping (for partitioning), or sorting (i’m sorting for better compression).
  2. I’m parallelize using a Pool, but:
    2.1) The CSV are very large sometimes, and reading many CSV in the same time, consume too much memory.
    2.2) I can read the CSV’s in chunks, but as the process is very slow because we are merging the final partitions with existant parquets. I’m not expert, maybe i’m doing something wrong here.

Is possible to write in streamming the CSV, but without a iterator?

I’m thinking in something as:

  1. Read in streamming the CSV.
  2. With each row:
    2.1) Check if the partition key has a csv file created.
    2.2) If the csv file is created: append the row to the file.
    2.3) If not created: create the file, in a dictionary save the file as the assigned file of the partition key, and append the row.

Reading and Writting in streamming and without grouping the rows, only using a dictionary of partition keys => csv files. Is that possible?

Once CSV.jl updates so that CSV.Rows accepts types then I will register this branch

which has a CSV chunk reader by wrapping some of CSV.jl’s functionalities (so thanks to CSV.jl).

1 Like

Would DatFramesDBs be of any help?


1 Like

If you are considering using an sql layer, I would recommend ClickHouse (https://clickhouse.tech/). DataFrameDBs is still under development and I would not recommend it to a person who needs to analyze data right now. But yes, it was for such cases that I started to develop it

3 Likes

In case it might help someone: this would also be super simple to achieve with Apache Spark.

Thanks you, I’m the original poster of the question… (my original account is lost). I tried with clickhouse months ago and i can confirm that is an excelent tool to play with medium data (some terabytes of info) in a easy way.

Really, pandas or vaex are not competition to clickhouse, agregating and playing with the data at this scale. In a single machine i can to aggregate, filter, etc at very fast performance. I’m tried with spark too, but spark is not a simple solution (from the point of view of deployment and operations), and Azure HDInsight is expensive (compared to a single machine with Clickhouse, that can be installed in the developer machine to develop the solution in a very simple way).

It’s sad that clickhouse is not working in Google Colab

2 Likes

Thanks for the notification. I’m hoping Julia could offer a solution to medium data problems in some time. I think Clickhouse use a very good architectural approach in that sense. Maybe you are interested in this info:

https://clickhouse.tech/docs/en/development/architecture/#block

and

https://clickhouse.tech/docs/en/development/architecture/#block-streams

Note that in clickhouse the size of the block (the chunk) is independent from the data source, so if i have 1 tb csv file, parquet o whatever, the chunks are of the same size always, and that size is selected to be very efficient with the vectorized operations that the rest of the execution pipeline is using, and to consume little memory.

Have u tried diskframe.com?

Nop, i haven’t tried diskframe. diskframe has comparable performance and workflow to dask?

I would say in samw ball park. But i am often surprised at how unoptimised dask is ag certain tasks.

A cheap way of doing this would be to chunk the data via linux’s split command such that each chunk fits into memory. You can write your own split, but the linux cli tools tend to be quite good and fast for these tasks!

For each chunk, read with csv, and break up the data into memory mapped column stores, Feather or JuliaDB file stores.

Pick a consistent naming convention for each file ie: “chunk_00001”. Then voila. Now you can do 1 to 1 transforms/joins across cores on chunks and be 4-16x better then pandas :P.

But yea with Apache Spark you should be able to do something very similar to this with parquets. My advice is run like hell away from PySpark, and use real Spark. But Spark has it’s own issues…

This all of course really depends on your chunking strategy… Doing it simply by rows is not very handy for most cases. but! One can write very simple emitter functions to break up data by some other criteria in base Julia really easily…

I’ve been working on a simple package for something not directly related but maybe helpful if you are doing lookups: https://github.com/caseykneale/LockandKeyLookups.jl

Yeah. I do that, the issue is windows users can’t use it.

Try https://github.com/xiaodaigh/DataConvenience.jl

CSV Chunk Reader
You can read a CSV in chunks and apply logic to each chunk. The types of each column is inferred by CSV.read.

using DataConvenience: CsvChunkIterator
for chunk in CsvChunkIterator(filepath)
  # chunk is a DataFrame
  # do something to df
end

The chunk iterator uses CSV.read parameters. The user can pass in type and types to dictate the types of each column e.g.

# read all column as String
for chunk in CsvChunkIterator(filepath, type=String)
  # df is a DataFrame where each column is String
  # do something to df
end
2 Likes

So crazy I had to write one of these myself ~2 yrs ago. You’re all taking me back down julia memory lane and why I left python in the first place…

There is a little package for ClickHouse in julia.

2 Likes