How to subset a stream I/O and pass it to TensorFlow.jl? (question has been updated)

Hello! I’m not sure what the best way to sort is in this case but this is the plan: I would like to turn the below block of code into a loop (I was advised by @ChrisRackauckas not to loop through rows, and to use DataFramesMeta or Query)

using DataFrames
df = readtable(nrows=1000000, skipstart=0 "file1.csv")
sort!(df, cols = [:Type])
writetable("sorted_file1_1.csv", df)

The idea is to read every 1 million rows from the 5.2 GB CSV file, sort it by column :Type, do writetable, then repeat for the next million rows, by readtable(nrows=1000000, skipstart=1000000, "file.csv").

Two variables I can think of right away are: skipstart= will increase by 1 million for each new file. writetable will increase the filename of sorted_file1_1 by 1 for every new file (e.g. the next file would be named sorted_file1_2.csv).

This approach is naive, because it doesn’t first consider the total amount of rows in the file, break it up into approximately equal size files, then sort all files.

After sorting, I need to read and input into a model only df[:Type] .== "Trade" rows, using TextLineReader with the decode_csv operation from Tensorflow.jl.

@SaschaMann @ScottPJones

If you only need a subset, why write out all the records? That seems inefficient. Also, if you sort by type, but then only keep one type, why sort?

In any case, when I deal with large files and only need a subset of the information, I use low-level routines from CSV.jl. You could read the file line by line, filter for "Trade", keep what you need.

1 Like

Hi Tamas, thanks for the very good questions. Is there a way to read the 5 GB CSV file at once, or do I need to break it up into smaller files, and then read each file line by line, and filter for "Trade"? Let’s take it to the extreme, say I have a 30 GB CSV file?

Just a warning, the sorting in Query.jl is not very optimized at this point. I know how to improve it, just haven’t found the time yet.

In theory a Query.jl query like this:

df = @from i in CSV.Source("file1.csv") begin
     @where i.Type=="Trade"
     @select i
     @collect DataFrame
end

should work with very large datasets because the whole design of both DataStreams and Query is one based on streaming data, i.e. this query will be executed without loading the whole content of file1.csv into main memory ever. As long as you filter out enough rows with the @where condition so that the resulting DataFrame fits into main memory, this should work in theory with an arbitrary large source csv file. Having said that, I have never tried it and this is how I think things should work, reality unfortunately is often quite different. If you do try it, please report back if that works!

1 Like

Thanks David for the script and tips. I’m about to try it now.

To limit @where i.Type .== "Trade" to 3000000 rows, should I do

@where i.Type .== "Trade" && i < 3000000?

Also, how do I initialize all of df without reading all of it?

Tamas it’s bold to ask after receiving such wonderful insights from you, but could you provide example code for your suggested CSV.jl approach?

I’m skimming through Home · CSV.jl and assuming it would take a few numerous attempts to get this short block right.

I have handled 180GB CSV files on my laptop. The trick is to read line by line, discard what I don’t need, process in one or two passes. Also, compressing (gz -9) and reading from the stream saves a lot of space and makes it faster (or at least not slower). The downside is that you need to write custom code.

2 Likes

Thanks Tamas. The per line read I need is very simple: if column :Type is "Trade", store the :Price and :Volume element values, and discard everything else. I understand CSV.readline should be used, I’ll take a look at its source code.

I have the CSV file in GZ format as well, but how (with what tools/package) do I read from and write to a GZ? I found GZip.jl, would that work?

That won’t work, a @where clause is applied row by row, and the i in my example refers to one complete row, represented as a NamedTuple, so you can’t compare it to a number.

What you are looking for is the @take and @skip query operator, but both of them are not yet implemented. They are on my long list of things to do.

In the meantime, the following might work:

q = @from i in CSV.Source("file.csv") begin
    @where i.Type=="Trade"
    @select {i.Price, i.Volumne}
end

df = DataFrame(take(q, 3_000_000))

The query in this example is not terminated by a @collect statement. The whole query statement here constructs a query, but that query is not being run until a later time, namely when this query will be iterated. A query that ends with a @collect statement will run right away.

So q is essentially just a query that can be iterated. The take function is from julia base, and it creates a new iterator that will only iterate the first 3m elements from q. And then we pass that into the constructor of DataFrames, and thanks to IterableTables.jl this will materialize the query results into a DataFrame.

If you want to skip some rows in the beginning, you can use the drop function from julia base in a similar way.

At least in theory that query still should not load the whole file into main memory, so this might work with very large files, but again, I haven’t tested that.

Couple of other comments:

  • I selected only two of the columns in this query, I took it from your later comments that you actually wanted to do that.
  • I’m not sure what you want to do with the results of this query. Use the DataFrame right away with some other package? Or first write it out to disc? If the latter, you should actually not materialize things into a DataFrame at all, but instead write it directly to a CSV file, as described here. You would use code like CSV.write("filename.csv", IterableTables.get_datastreams_source(take(q,3_000_000))) for that.

I don’t understand that question, can you elaborate a bit?

1 Like

Use the Libz package to read .gz files directly. In some tests I have done, it was only about 10% slower than reading the uncompressed files - an outstanding implementation!

1 Like

Hi David! I think this answers the question of initializing df without loading all of it into main memory. Thank you.

Yes, price and volume are all I need. Thank you!

The results of this query are supposed to be consumed by a TensorFlow.jl model in CSV format if I’m not mistaken, from this documentation. I haven’t even touched this topic of bridging between e.g. Query.jl and Tensorflow.jl, and am not sure if the latter will demand Query.jl to behave a certain way or not.

Also as Tamas and now recently @js135005 pointed out (thanks), can Query.jl work together with a GZ package to read directly from GZ files (which in this case would have to write to CSV for TensorFlow?)?

John, @Tamas_Papp said working with GZ files “makes it faster”. You’re saying it makes it slower. Which one is it?

I’m not familiar with TensorFlow, so I won’t be of much help on that count, but please report back! I guess I should at some point look into integrating IterableTables.jl with TensorFlow.jl, at that point queries would automatically work as input to TensorFlow.jl.

That really depends on whether the CSV.jl package works with one of these. In general you should be able to pass an IO stream instead of a filename to CSV.Source. If GitHub - BioJulia/Libz.jl: Fast, flexible zlib bindings. returns an IO stream that works in that way with CSV it might all just work.

Okay David, here we go!

julia> q = @from i in CSV.Source("/file.csv") begin
           @where i.Type .== "Trade"
           @select {i.Price, i.Volumne}
       end
ERROR: UndefVarError: CSV not defined

julia> 

Did you not using CSV?

1 Like

Thanks Chris, for the millionth time.

Results for .==, https://gist.github.com/hpoit/d6a5bf14e63091e94d7dc4f80f5496c6

Results for ==, https://gist.github.com/hpoit/af9774c7eb38c07d2fbf5fbd7ec2fa36

What’s more correct, .== or ==, since i is already iterating over the row?

@davidanthoff

You’ll also have to add using DataFrames.

You should use == for the comparison, all the expressions inside a query operate on a single row at a time.

.== will actually work in the next version of Query, at that point it will be the lifted version of the == operator that will return a DataValue{Bool}, whereas == will return just a Bool. So .== will propagate missing values, whereas == is going to be just a regular predicate that always returns either true or false.

Thanks David. Here are the new results.

That’s exactly what Libz does. It provides a very efficient DataStream wrapper for gzip files. It looks just like a normal IO object so I see no reason why it shouldn’t work.

As I noted earlier, the overhead is very low and reading .gz files directly would certainly be less expensive than uncompressing them to clear files and then processing those clear files.

John Sutherland

Hi John, since I am for the moment working with historic data (and not real-time data), I’m not sure if an IO stream is necessary? Or should I just build an IO stream for historic and real-time data? Also, the historic data will come from one terminal, and the real-time, from another, so there will be two different layouts as well.