How to subset a stream I/O and pass it to TensorFlow.jl? (question has been updated)

Basically the DataStream object behaves just like any regular file. If you would read from CSV files, then you can also read directly from the gzipped versions just as well. In my work, I am dealing with hundreds and thousands of multi-gigabyte log files. It is a major advantage to be able to process the compressed files directly. I’m also working with historic data - results from very large scale computer performance load tests.

Okay so I will work towards reading from GZ and writing in CSV for TensorFlow.jl, unless Tensorflow can also read GZ, which apparently it can. I would have to create a new reader inside TensorFlow.jl. @malmaud

I don’t know if it makes a difference to pass the IO stream to TensorFlow.jl as CSV or GZ.

David, how would passing an IO stream to CSV.source look like?

I believe you are using Query.jl version 0.3.x, but all my examples require version 0.4.0. A simple Pkg.update() should get you the latest version of Query and all other packages you are using.

I believe you just pass it to the CSV.Source constructor like this, assuming you assigned the stream to a variable called stream:

q = @from i in CSV.Source(stream) begin
    @where i.Type .== "Trade"
    @select {i.Price, i.Volumne}
end

Just to clarify, when we are talking about a stream here that you might get from some unzip package, we are not talking about some streaming data model, we are just talking about a julia IO stream, see the doc here https://docs.julialang.org/en/latest/manual/networking-and-streams.html about details.

Depends, and you should benchmark it for your case. I am using the excellent BioJulia/Libz.jl, and my bottleneck was IO, so I got a speedup when I was reading data from a HDD and a network drive (with a very fast connection), and got no noticable slowdown on an SSD. So for me it was a no-brainer to use compressed files.

Great thread!

Looks like it might need more work with a Libz (BufferedStreams) stream, as CSV.jl requires the nb_available method:

julia> using CSV, Query, Libz

julia> io = Libz.ZlibInflateInputStream(open("/tmp/abc.txt.gz", "r"))
BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}(<8.0 KiB buffer, 0% filled>)

julia> q = @from i in CSV.Source(io) begin
           @where i.n >= 500
           @select {i.n}
       end
ERROR: MethodError: no method matching nb_available(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}})
Closest candidates are:
  nb_available(::Base.Filesystem.File) at filesystem.jl:162
  nb_available(::BufferStream) at stream.jl:1036
  nb_available(::IOStream) at iostream.jl:146
  ...
 in #Source#12(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}, ::CSV.Options, ::Int64, ::Int64, ::Array{DataType,1}, ::Bool, ::Bool, ::Int64, ::Int64, ::Int64, ::Bool, ::Type{T}) at /Users/josh/.julia/v0.5/CSV/src/Source.jl:59
 in (::Core.#kw#Type)(::Array{Any,1}, ::Type{CSV.Source}) at ./<missing>:0
 in #Source#11(::UInt8, ::UInt8, ::UInt8, ::String, ::Int64, ::Int64, ::Array{DataType,1}, ::Bool, ::Bool, ::Base.Dates.DateFormat, ::Int64, ::Int64, ::Int64, ::Bool, ::Type{T}, ::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}) at /Users/josh/.julia/v0.5/CSV/src/Source.jl:25
 in CSV.Source(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}) at /Users/josh/.julia/v0.5/CSV/src/Source.jl:21

Where /tmp/abc.txt (unzipped) is

n
1
2
...
999
1000

Working on the uncompressed stream does work, though :slight_smile:

julia> q = @from i in CSV.Source(open("/tmp/abc.txt", "r")) begin
           @where i.n >= 500
           @select {i.n}
       end

julia> collect(q)
501-element Array{NamedTuples._NT_n{Nullable{Int64}},1}:
 (n = 500)
 (n = 501)
...
 (n = 999)
 (n = 1000)

Quick hack to get Zlib streaming working:

julia> using CSV, Query, Libz

julia> Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = eof(s)  # the hack!

julia> io = Libz.ZlibInflateInputStream(open("/tmp/abc.txt.gz", "r"))

julia> q = @from i in CSV.Source(io) begin
           @where i.n >= 500
           @select {i.n}
       end

julia> collect(q)
501-element Array{NamedTuples._NT_n{Nullable{Int64}},1}:
 (n = 500)
 (n = 501)
...
 (n = 999)
 (n = 1000)
1 Like

Hi David, the latest results :slight_smile:.

I will break fast and return to all your comments below the first two sentences, and to Tamas’ and Josh’s comments.

Hi David, Tamas, and Josh, thank you all for the information. What I’m set to do then is


using CSV, DataFrames, Query, Libz

# nb_available hack for streams (Josh correct me if I'm wrong about the reason for the hack)
Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = eof(s)

# Define stream I/O
io = Libz.ZlibInflateInputStream(open("/file.csv.gz", "r"))

# Define query or subset in I/O
q = @from i in CSV.Source(io) begin
    @select i.Time_G_
    @where i.Type == "Trade"
    @select {i.Price, i.Volume}
end

# Define df and build DataFrame from I/O
df = DataFrame(take(q, 10_000_000))

I just need a little help from David to get @select i.Time_G_ right to have the column Time_G_ displayed next to columns Price and Volume.

Also, Josh, how long do you think this hack will last for? How do I build an insurance policy for it?

1 Like

You want just one @select statement in your query that picks all the columns you need:

q = @from i in CSV.Source(io) begin
    @where i.Type == "Trade"
    @select {i.Price, i.Volume, i.Time_G_}
end
1 Like

where should I place @time?

I had tried @select {i.Time_G_, i.Price, i.Volume} and that didn’t work so I didn’t think of @select {i.Price, i.Volume, i.Time_G_}. So the rule is place columns that apply to @where first in the list, than any other column next?

Latest output.

No, I think something else is mixed up, this should work with any ordering of the columns in the @select statement. Looking at your latest output, the CSV reading part seems to indicate that there are 0 rows, so you are running a query on a 0 rows data set. The problem seems to be somewhere upstream of the query, i.e. in the CSV reading part.

This might be telling

julia> q = @from i in CSV.Source(io) begin
                  @where i.Type == "Trade"
                  @select {i.Price, i.Volume}
              end
ERROR: too many parameters for type _NT_
 in getiterator(::CSV.Source) at /Users/Corvus/.julia/v0.5/IterableTables/src/integrations/datastreams.jl:46
 in query(::CSV.Source) at /Users/Corvus/.julia/v0.5/Query/src/sources/source_iterable.jl:7

julia> 

I think again that something is going wrong with the CSV reading here, before the query (although Query should give a more helpful error message). Can you paste here somewhere what the CSV.source(io) expression returns?

Absolutely

julia> CSV.Source(io)
CSV.Source: __IO__
    CSV.Options:
        delim: ','
        quotechar: '"'
        escapechar: '\\'
        null: ""
        dateformat: Base.Dates.DateFormat(Base.Dates.Slot[Base.Dates.DelimitedSlot{Base.Dates.Year}(Base.Dates.Year,'y',4,"-"),Base.Dates.DelimitedSlot{Base.Dates.Month}(Base.Dates.Month,'m',2,"-"),Base.Dates.DelimitedSlot{Base.Dates.Day}(Base.Dates.Day,'d',2,r"(?=\s|$)")],"","english")
Data.Schema{true}:
rows: 0	cols: 1
Columns:
 ""  Nullable{WeakRefString{UInt8}}

julia> 

Sorry, the nb_available for the Libz stream should have been !eof:

if !method_exists(nb_available, (BufferedStreams.BufferedInputStream{Libz.Source}, ))
    Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = !eof(s)
end

It is only required to let CSV.Source know (once, at the start) that it can start on the stream and that it isn’t empty or already exhausted.

The method_exists check should make it a little bit more future-proof, in case an official method is later added.

Thanks Josh. Here’s the output.

David, where should I apply @time in the query definition?

You’ll need to either fix the header in the CSV file to exclude “non-identifier” characters (i.e. Time[G] -> Time), or use getfield, e.g.

q = @from i in CSV.Source(io) begin
    @where i.Type == "Trade"
    @select {i.Price, i.Volume, getfield(i, Symbol("Time[G]")}
end

It’s actually Time_G_, which should be parsed as one identifier string just like Price and Volume?

I tried your snippet with Time[G] which didn’t work, and with Time_G_ which worked. In any case, I don’t understand the reason for this special treatment.

DataFrames is still not building.

Hmm - Look Ma, no rows!

What do you get from:

io = Libz.ZlibInflateInputStream(open("file.csv.gz", "r"))
readline(io)