How well Apache Arrow’s zero copy methodology is supported?

I made a large Arrow file that doesn’t fit into the RAM of my PC. The file is about 13 GB and 170 million rows. I wanted to test how well I can analyze “big data” with Julia’s Arrow.jl package.

After reading the file (at=Arrow.Table("big.arrow")), I am able to do statistics on it, for example: using Statistics; mean(at.total_amount) (wherein total_amount is a column in the table). This is fast and doesn’t fill the RAM, done probably in zero copy fashion.

However, queries take lots of time and fill memory, like the lines below

using Tables
using Query
using DataFrames
Tables.datavaluerows(at)|>@map({_.passenger_count,_.total_amount})|>@filter(_.total_amount>100)|>DataFrame

It seems that it is not possible to run queries in zero copy fashion? Or am I doing something wrong

The versions I am having:
[69666777] Arrow v1.1.0 https://github.com/apache/arrow:julia/Arrow#apache-arrow-3.0.0
[a93c6f00] DataFrames v0.22.7
[1a8c2f83] Query v1.0.0
[bd369af6] Tables v1.4.2

Probably you hit implementation details of @map and Tables.datavaluerows, not Arrow implementation. It is possible that these operations were not optimized to work in zero copy mode.

That code builds a DataFrame. Does DataFrame support out-of-core storage?

Most likely not. So try to re-implement your problem without using a DataFrame.

Update:
Perhaps this is not the problem as long as the result fits in the memory.
Then the allocations happen in Query.jl

Kind of. DataFrame can store Arrow files that are read in lazily. But it’s very eager with copying if you aren’t careful.

2 Likes

Yes, the issue here is that when you do Tables.datavaluerows(at), then you’re taking the arrow memory that is zero-copy in the Arrow.Table type, and materializing each row as a NamedTuple. Currently, that’s part of the design of Query.jl in that it requires strictly NamedTuples as the “row” objects of tables.

As others have mentioned, if you move your analysis to DataFrames.jl, the arrow columns will be zero-copy by default when you do df = DataFrame(at). If you then do select or filter operations on the DataFrame, those should avoid any excessive materialization.

In the Tables.jl interface, when you call Tables.rows(at), it provides an efficient “row view” iterator that avoids full allocation of all the values. So you could do this operation in the “Tables.jl” way using TableOperations.jl like:

at |> TableOperations.select(:passenger_count, :total_amount) |> TableOperations.filter(x -> x.total_amount > 100) |> DataFrame

That will still end up allocating new columns in the final DataFrame for the output, but I assume you’re aware that’s expected.

7 Likes

I confirm, df = DataFrame(at) fills RAM and doesn’t seem to finish at all. So that is not a solution.

However, TableOperations works, thank you @quinnj pointing that out. The line
at |> TableOperations.select(:passenger_count, :total_amount) |> TableOperations.filter(x -> x.total_amount > 100) |> Tables.columntable
runs in 9 secs (13 GB and 170 million rows) with PC having a NVMe SSD drive.

In my view, this is remarkable that dataset can be bigger than RAM or GPU memory and one can still use high-level API. This is not possible with pyarrow (would require that one reads data to DataFrame, becoming depended on amount RAM) and RAPIDS’s cuDF (which is limited by GPU memory). Nice work!

However, TableOperations seems to lack some key methods for data wrangling/munging, like join and groupby, or perhaps I didn’t find a way to do them.

1 Like

What do you mean you can “confirm df = DataFrame(at) fills RAM and doesn’t seem to finish at all”? What Jacob said above was that there’s no copying when you do df = DataFrame(at). This has changed back and forth a bit in recent versions of Arrow I believe, so make sure you’re one the latest version. You can also try explicitly passing DataFrame(at, copycols = false).

Yes, there was a recent bug in DataFrames (introduced by myself) that led to doing a full copy of the data when you did something like DataFrame(at), so you’ll just want to make sure you’re on version 0.22.7 or 1.0.0, either contains the bugfix.

Yes, TableOperations.jl doesn’t have a full suite of data processing functions. It’s been built up “on demand” as people have requested additional functionality, with a focus on lazy evaluation scenarios. It would certainly be interesting to explore whether all the fast join functionality recently done in DataFrames could be generalized in TableOperations.jl.

1 Like

Just to check then - would you expect

df = DataFrame(at)

filter!(:total_amount => (>)(100), df)

where at is a lazily loaded Arrow.Table to be as efficient as the more verbose TableOperations code above?

Maybe not:

julia> at = DataFrame(Arrow.Table(arrow_table_path));

julia> filter!(:Week => (<=)(Date(2016, 11, 14)), Cdta_Faults)
MethodError: no method matching deleteat!(::Arrow.Primitive{Date, Vector{Arrow.Date{Arrow.Flatbuf.DateUnitModule.DAY, Int32}}}, ::Vector{Int64})

Yes, as you discovered, arrow data (and their Arrow.jl representations) is immutable, so you’d need to do filter instead of filter!, since we can’t modify the input arrays/columns.

Yep, that works. What will be materialized at that point? When I just did this, my system was at 12 out of 24GB of RAM used, and the filter operation pushed me into swap (i.e. used up all the available 12GB and then continued to run with swap for some time thereafter). The arrow table here is 5.5GB on disk (28m rows, 34 columns).

Edited to add: It also eats up all my memory when starting from a fresh Julia session (where I have around 18GB spare)

df=DataFrame(at, copycols = false)

seems to work when I have Arrow version 1.0.0. Then I can filter with

filter(:total_amount => (>)(100), df)

without eating up RAM a lot (seems to take few GB though?). My laptop did filtering in ~20s on the data I mentioned above. Not yet tested with other Arrow versions.

Anyhow, this is great! Thank you for the help. I’ll test other wrangling methods.

That’s interesting… I just got the following:

julia> @time filter(:Week => (<=)(Date(2016, 11, 14)), at);
166.401539 seconds (486.87 M allocations: 20.606 GiB, 71.35% gc time, 0.62% compilation time)

For the table described above (which seems to be smaller than the table you’re working on!), with Arrow 1.4 and DataFrames 1.0

The result of each column should be the same type that you get if you called copy(column), just with fewer items since, in theory, a bunch will be filtered out. But do note that calling filter will still retain all columns in a DataFrame, just without the rows being filtered out. I don’t think I’d be too surprised if the result ended up being largish in memory, because at that point, we’re comparing the overall in-memory footprint of the Julia objects vs. the arrow memory footprint. I’d guess the Julia footprint is slightly larger in some cases, since the arrow format has been designed specifically to play nice /w memory.

The other thing to think about is that there’s probably still a reference to the original arrow table, so doing teh filter operation is allocating all those columns in addition to the existing Arrow.Table footprint, though it should be mmapped, so shouldn’t have quite teh same memory pressure issues.

All that said, we could certainly have issues/bugs still! So if you still find things seem to be really off, feel free to file an issue with some kind of reproduction and we can help take a look to see if something unexpected is going on.

I’d love to file issues, although my problem is that generally I’m working on proprietary data. Also, I struggle to get a good sense of what I should expect, i.e. what could be considered “really off”.

If you’ll indulge me, maybe take the operation I’m doing here. I’ve got this 28m-by-34 table, and am filtering on a single column. Column types are mainly strings, dates, and a few floats and ints. I then do the filter operation above, which removes around 7m of the 28m rows. What should I expect for this? Should the memory footprint be that of a 21m-by-34 table fully materialized?

For context, I can do

julia @time at = @view at[at.Week .<= Date(2016, 11, 14), :];
0.574345 seconds (399.45 k allocations: 185.803 MiB, 29.78% gc time, 44.69% compilation time)

on the same table. So here it appears I have successfully filtered out everything without allocating any memory at all. If I then do

@time at = copy(at);
215.217447 seconds (486.05 M allocations: 20.402 GiB, 79.84% gc time, 0.46% compilation time)

again all my memory is eaten up (17GB that I had spare when calling copy) and most of the time of the operation is spent in swap. The table in the view was only 21/28 the size of the original table, so I would have expected it to be ~4GB on disk. Why would it take 17GB + plenty of swap to copy this table? And why was this copy operation one minute slower than the direct filter above, if filter materializes the same in the end?

Sorry if I’m not being super helpful, I just really struggle to develop a good mental model of Arrow + DataFrames performance and which operations should and shouldn’t be fast. Overall I find the progress that’s been made on this front mindblowing in terms of what it allows me to do without resorting to low-level trickery or usage of much more complicated out-of-memory distributed tools, it’s just that sometimes randomly things eat up my memory or take forever and I can’t figure out what’s going on.

FWIW, subset(at, :Week => (<=)(Date(2016, 11, 14)), view=true) is an alternative to @view at[at.Week .<= Date(2016, 11, 14), :] which should be equivalent.

2 Likes

What you think, should we have a julia script that generates a synthetic dataset that everyone can test in their machine (having different IO throughputs and processor architectures)? Perhaps the synthetic dataset could be tested against before releases?

@quinnj , are you able to explain this:
when I run

at=Arrow.Table("big.arrow")
let t
    t=at |> TableOperations.select(:vendor_id, :passenger_count, :total_amount) |> TableOperations.filter(x -> x.total_amount > 10) |> Tables.columntable
end

with my big taxi dataset, the code eats up ~5GB of RAM and that memory isn’t freed up after the let block (as I assumed based on Julia docs). Memory becomes freed up only after I kill the Julia process. Implicitly calling garbage collection doesn’t seem to help.

Not an expert on Julia, but could there be a memory leak that isn’t under control of Julia?

The versions I have:
[69666777] Arrow v1.4.1
[a93c6f00] DataFrames v1.0.1
[1313f7d8] DataFramesMeta v0.6.1
[ab02a1b2] TableOperations v1.0.0
[bd369af6] Tables v1.4.2