How to subset a stream I/O and pass it to TensorFlow.jl? (question has been updated)

Lol. I get

julia> io = Libz.ZlibInflateInputStream(open("file.csv.gz", "r"))
BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}(<8.0 KiB buffer, 0% filled>)

julia> readline(io)
"#RIC,Date[G],Time[G],GMT Offset,Type,Price,Volume,Bid Price,Bid Size,Ask Price,Ask Size,Qualifiers\n"

julia> 

OK - looks right.

All I can think of is that you need to make sure io hasn’t already been used before running the query, otherwise it will have been exhausted (or missing some rows, at least, including the header) - i.e. don’t rerun the query q without reopening io.

pvt3 was run once from a new Julia terminal. David, I still need your help.

I’m not sure, as I said, this looks like a problem with reading the CSV file, not with the query part of things. If you look at the pvt3 output you can see a line that says rows: 0 cols: 1. That info comes from the CSV.jl package and means that the csv reading code for some reason sees 0 rows and only one column in the file. pvt2 on the other hand doesn’t have that problem, so you are changing something in the way you read the CSV file there, all before the query part.

Normally one would not benchmark in global scope, but in your case the runtime should dominate compilation on the full dataset, so it should be OK.

Okay, CSV.jl on pvt2 “sees” right, but then what’s happening on the next step, df = DataFrame(take(q, 10_000_000))?

Well, the benchmark runs okay
julia> @time query() 196.525924 seconds (5.28 M allocations: 8.289 GB, 1.23% gc time)

but now df = DataFrame(take(query(), 10_000_000)) won’t start because it can’t take in query().

Hi @quinnj, is the problem inside CSV.jl? If yes, where do I start to learn about it (/Source.jl or /io.jl)? Thanks.

Umm…not exactly sure. What’s the csv file in question? Can it be posted/attached here so I can take a look? Along with the CSV.Source or CSV.read command being used?

You are running into the same problem that @joshbode described already:

You can’t call CSV.Source twice on the same io stream. I’m not sure whether the stream from libz can somehow be reset after you’ve read from it once, or whether you’ll just have to open a new stream for the second call to CSV.Source.

Can resetting still be the problem?

julia> CSV.reset!(CSV.Source(io))

julia> df = DataFrame(take(query(), 10_000_000))
ERROR: too many parameters for type _NT_
 in getiterator(::CSV.Source) at /Users/Corvus/.julia/v0.5/IterableTables/src/integrations/datastreams.jl:46
 in query(::CSV.Source) at /Users/Corvus/.julia/v0.5/Query/src/sources/source_iterable.jl:7
 in query() at ./REPL[4]:2

julia> 

You would need to reset the io stream, not the CSV.Source instance. I think seekstart(io) before you call CSV.Source(io) might do the trick, if libz streams support that.

Hi Jacob, I wasn’t able to attach a CSV here, but here’s a picture of a sample of it

Source and read commands used are

julia> CSV.Source(io)
CSV.Source: __IO__
    CSV.Options:
        delim: ','
        quotechar: '"'
        escapechar: '\\'
        null: ""
        dateformat: Base.Dates.DateFormat(Base.Dates.Slot[Base.Dates.DelimitedSlot{Base.Dates.Year}(Base.Dates.Year,'y',4,"-"),Base.Dates.DelimitedSlot{Base.Dates.Month}(Base.Dates.Month,'m',2,"-"),Base.Dates.DelimitedSlot{Base.Dates.Day}(Base.Dates.Day,'d',2,r"(?=\s|$)")],"","english")
Data.Schema{true}:
rows: 0	cols: 1
Columns:
 ""  Nullable{WeakRefString{UInt8}}

julia> CSV.read(io)
0×1 DataFrames.DataFrame

julia> 

Perhaps we’re missing or mis-specified some CSV.Options or Data.Schema? Or maybe the I/O changes the reading? It was reading fine as a CSV (not I/O).

I couldn’t find seekstart() in Libz.jl.

It is very clear what is going on here: you are reading repeatedly from the io stream without calling seekstart on the stream. The example you just posted is another case of that: You can’t call CSV.Source(io) followed by CSV.read(io) without calling seekstart(io) inbetween.

If seekstart is not supported by libz you will have to reopen the stream once you have read from it via CSV.Source or CSV.read or anything like that.

I couldn’t find it in Libz.jl but does this mean Libz.jl still supports it?

help?> seekstart
search: seekstart

  seekstart(s)

  Seek a stream to its beginning.

julia> 

In any case, this is the sequence

using CSV, DataFrames, Query, Libz

# nb_available hack for streams (Josh correct me if I'm wrong about the reason for the hack)
if !method_exists(nb_available, (BufferedStreams.BufferedInputStream{Libz.Source}, ))
    Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = !eof(s)
end

# Define stream I/O
io = Libz.ZlibInflateInputStream(open("/file.csv.gz", "r"))

# Define query and subset in I/O
function query()
       q = @from i in CSV.Source(io) begin
           @where i.Type == "Trade"
           @select {i.Price, i.Volume, i.Time_G_} # pvt2
       end
end

@time query()

# Define df and build DataFrame from I/O
df = DataFrame(take(query(), 10_000_000))

How would I define a seekstart(io) call before the second call to CSV.Source(io)?

Oh I got this as well, does that mean Libz does not support it?

julia> seekstart(io)
ERROR: ArgumentError: Can't seek in input stream with source of type Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}
 in seek(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}, ::Int64) at /Users/Corvus/.julia/v0.5/BufferedStreams/src/bufferedinputstream.jl:323
 in seekstart(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}) at ./iostream.jl:54

julia> 

Okay let’s assume it does not support it, how do I reopen I/O with CSV.Source() or CSV.read() before the second call to CSV.Source(io)?

Here’s the latest application of the sequence with seekstart(io), before I called CSV.Source(io) for the second time.

Hello @bicycle1885 and @dcjones, would you know where I could get instructions to support Libz.jl with seekstart(io)?

Just put it all into a function like this:

function query(filename)
    file = open(filename, "r")
    io = Libz.ZlibInflateInputStream(file)
    csv = CSV.Source(io)
    q = @from i in csv begin
        @where i.Type == "Trade"
        @select {i.Price, i.Volume, i.Time_G_}
    end
    df = DataFrame(take(query(), 10_000_000))
    return df
end

Note a couple of things:

  • If you use the @time macro make sure you run the function once before you time it, otherwise you are just timing the compiler.
  • This function is now opening lots of things that should be closed after use (file stream, probably libz stream, and I think the CSV.Source instance). Ideally you would use try ... finally blocks to make sure these resources always get freed. Something like this:
function query(filename)
    file = open(filename, "r")
    try
        io = Libz.ZlibInflateInputStream(file)
        try
            csv = CSV.Source(io)
            try
                q = @from i in csv begin
                    @where i.Type == "Trade"
                    @select {i.Price, i.Volume, i.Time_G_}
                end
                df = DataFrame(take(query(), 10_000_000))
                return df
            finally
                Data.close!(csv)
            end
        finally
            close(io)
        end
    finally
        close(file)
    end
end
1 Like