Arrow stream usage clarification

I’m trying to use Arrow’s Stream API Reference · Arrow.jl to get an efficient out of RAM tabular data iterator.

However, attempts to iterate over the arrow stream only appear to perform a single iteration, where the entire table was read.

For example, here’s the test data creation:

using DataFrames
using Tables
using Arrow
using Random
using BenchmarkTools

Random.seed!(123)
nobs = 4_000_000
nfeats = 100
path = "data/test.arrow"
x = rand(nobs, nfeats)
df = DataFrame(x, :auto)
Arrow.write(path, df)

Now, by itearting over the Stream, the full table get materialized in a DataFrame in a single iteration:

for x in Arrow.Stream(path)
    df = DataFrame(x)
    sz = size(df)
    println("size: $sz")
end
  size: (4000000, 100)

My wish regarding Arrow’s Stream was have something behaving like: itr = Iterators.partition(stream, batch_size), so that each iterator would return a table/DataFrame with batch_size rows.

The docs read This allows iterating over extremely large "arrow tables" in chunks represented as record batches. which seems to point in that direction. I would appreciate to know how to achieve such batch processing.

Arrow.Stream reads data in record batches. Therefore you need to write data in record batches to a file first. Here is a simple example of splitting your data into two batches:

p = Tables.partitioner([view(df, 1:2000000, :), view(df, :2000001:4000000, :)])
Arrow.write(path, p)
1 Like

Also note that DataFrames.jl can work with larger than RAM Arrow.jl files since it is using memory mapping. The only case when you need partitions is when you would need to materialize this data in RAM (which typically, e.g. when doing aggregation, is not needed).

1 Like

I have added add Iterators.partition by bkamins · Pull Request #3212 · JuliaData/DataFrames.jl · GitHub to make partitioning of DataFrame objects easier. Until 1.5 release you can just copy the code I used there.

This threat is very clarifying, thank so much!.
Nevertheless, there is a case where I still can not see the light: What happens when the source is itself an Arrow.Stream?. In this case it is not obvious to me how to convert each table (in a partion) to a dataframe and then back to a new partition…in my naive understanding something like this should work:

using Arrow
using DataFrames
using IterTools

MassivePartionedTable_Input = Arrow.Stream("inputFile.arrow")
firstPartition, restPartitions       = firstrest( MassivePartionedTable_Input )
df_firstPartition                         = DataFrame( firstPartition )
largerTableOuput                      = DoesSomeThingOnThisPartition( df_firstPartition)
Arrow.write("outputFile.arrow",largerTableOutput)
for eachPartition in restPartitions 
  df_eachPartition  = DataFrame( eachPartition) 
  largerTableOuput   = DoesSomeThingOnThisPartition( df_eachPartition )
  Arrow.append("outputFile.arrow",largerTableOutput)
end

However I obtain as error:

ERROR: MethodError: no method matching append(::WindowsPath, ::DataFrame)

I am not sure about how to proceed…should I convert largerTableOUtput back as table with 1 partition…

sorry…in this case I find the documentation a bit fussy. I really will appreciate your ideas.

Javier

You need Arrow.write("outputFile.arrow",largerTableOutput, file=false) to allow for streaming.

However, this is not needed, you can just use Arrow.append:

MassivePartionedTable_Input = Arrow.Stream("inputFile.arrow")
for eachPartition in MassivePartionedTable_Input
  df_eachPartition  = DataFrame(eachPartition) 
  largerTableOuput   = DoesSomeThingOnThisPartition( df_eachPartition )
  Arrow.append("outputFile.arrow", largerTableOutput)
end

Example (input has 2 partitions):

julia> s = Arrow.Stream("test.arrow")
Arrow.Stream(Arrow.ArrowBlob[Arrow.ArrowBlob(UInt8[0xff, 0xff, 0xff, 0xff, 0xa8, 0x00, 0x00, 0x00, 0x10, 0x00  …  0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00], 1, 600)], 1, nothing, Symbol[], Type[], nothing, Dict{Int64, Arrow.DictEncoding}(), Dict{Int64, Arrow.Flatbuf.Field}(), true, Base.RefValue{Union{Nothing, Symbol}}(nothing))

julia> collect(s)
2-element Vector{Any}:
 Arrow.Table with 1 rows, 2 columns, and schema:
 :a  Int64
 :b  Int64
 Arrow.Table with 1 rows, 2 columns, and schema:
 :a  Int64
 :b  Int64

julia> for x in s
           df = DataFrame(x)
           @show df
           Arrow.append("test2.arrow", df)
       end
df = 1×2 DataFrame
 Row │ a      b
     │ Int64  Int64
─────┼──────────────
   1 │     1      2
df = 1×2 DataFrame
 Row │ a      b
     │ Int64  Int64
─────┼──────────────
   1 │     2      3

julia> read("test.arrow") == read("test2.arrow")
true
1 Like

Thanks a lot for your super quick reaction.

I did not know that you could also use append in this fashion!. Elegant. There is however a little issue. When I apply this approach to my problem I obtain the following error:

ERROR: MethodError: no method matching append(::WindowsPath, ::DataFrame)
Closest candidates are:
  append(::IO, ::Any; metadata, colmetadata, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, convert, file) at C:\Users\jloaizar\.julia\packages\Arrow\ZlMFU\src\append.jl:70
  append(::IO, ::Any, ::Any, ::Any, ::Any, ::Any, ::Any, ::Any, ::Any, ::Any, ::Any, ::Any, ::Any) at C:\Users\jloaizar\.julia\packages\Arrow\ZlMFU\src\append.jl:105
  append(::String, ::Any; kwargs...) at C:\Users\jloaizar\.julia\packages\Arrow\ZlMFU\src\append.jl:62

May I ask, what version of Arrow and DataFrames are you using?.

Javier

(@v1.8) pkg> st Arrow
  [69666777] Arrow v2.4.1

(@v1.8) pkg> st DataFrames
  [a93c6f00] DataFrames v1.4.3

But the problem seems the ::WindowsPath part. Are you sure you are passing a string with a file name? Can you also pass full stack trace?

That was indeed the problem. In my code I used FilePaths…and the resulting object was interpreted correctly by Arrow.write, and unfortunately Arrow.merge did not know what to do with it. After a few changes it all works well now.
Below the complete solution, as reference for someone experiencing the same problem:

using Arrow
using DataFrames
using IterTools
using FilePaths

OutputDir = absolute(p"outputDirectory")
OutputFile="outputFile.arrow"
OutputPath = joinpath(OutputDir,OutputFile);

MassivePartionedTable_Input = Arrow.Stream("inputFile.arrow")
firstPartition, restPartitions       = firstrest( MassivePartionedTable_Input )
df_firstPartition                         = DataFrame( firstPartition )
largerTableOuput                      = DoesSomeThingOnThisPartition( df_firstPartition)
Arrow.write(OutputPath,largerTableOutput,file=false)

for eachPartition in restPartitions 
  df_eachPartition    = DataFrame( eachPartition) 
  largerTableOuput   = DoesSomeThingOnThisPartition( df_eachPartition )
  Arrow.append(string(OutputPath),largerTableOutput)
end

Looking inside the Arrow sources I found something caller Arrow.writer. Unfortunately I could not find any example on how to use it. Do you know what would be the solution if one were to use Arrow.Writer rather than the combination of Arrow.write and Arrow.append that was used in this case?

Thanks again for all your help, I learned a lot today.

Sincerely
Javier

Javier

AFACIT Writer is an internal name only and should not be used by users.

(as a side note as commented above: you do not need to use Arrow.write in your solution, using Arrow.append should be enough)