Basically the DataStream object behaves just like any regular file. If you would read from CSV files, then you can also read directly from the gzipped versions just as well. In my work, I am dealing with hundreds and thousands of multi-gigabyte log files. It is a major advantage to be able to process the compressed files directly. I’m also working with historic data - results from very large scale computer performance load tests.
Okay so I will work towards reading from GZ and writing in CSV for TensorFlow.jl, unless Tensorflow can also read GZ, which apparently it can. I would have to create a new reader inside TensorFlow.jl. @malmaud
I don’t know if it makes a difference to pass the IO stream to TensorFlow.jl as CSV or GZ.
David, how would passing an IO stream to CSV.source
look like?
I believe you are using Query.jl version 0.3.x, but all my examples require version 0.4.0. A simple Pkg.update()
should get you the latest version of Query and all other packages you are using.
I believe you just pass it to the CSV.Source
constructor like this, assuming you assigned the stream to a variable called stream
:
q = @from i in CSV.Source(stream) begin
@where i.Type .== "Trade"
@select {i.Price, i.Volumne}
end
Just to clarify, when we are talking about a stream
here that you might get from some unzip package, we are not talking about some streaming data model, we are just talking about a julia IO stream, see the doc here https://docs.julialang.org/en/latest/manual/networking-and-streams.html about details.
Depends, and you should benchmark it for your case. I am using the excellent BioJulia/Libz.jl
, and my bottleneck was IO, so I got a speedup when I was reading data from a HDD and a network drive (with a very fast connection), and got no noticable slowdown on an SSD. So for me it was a no-brainer to use compressed files.
Great thread!
Looks like it might need more work with a Libz (BufferedStreams) stream, as CSV.jl requires the nb_available
method:
julia> using CSV, Query, Libz
julia> io = Libz.ZlibInflateInputStream(open("/tmp/abc.txt.gz", "r"))
BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}(<8.0 KiB buffer, 0% filled>)
julia> q = @from i in CSV.Source(io) begin
@where i.n >= 500
@select {i.n}
end
ERROR: MethodError: no method matching nb_available(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}})
Closest candidates are:
nb_available(::Base.Filesystem.File) at filesystem.jl:162
nb_available(::BufferStream) at stream.jl:1036
nb_available(::IOStream) at iostream.jl:146
...
in #Source#12(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}, ::CSV.Options, ::Int64, ::Int64, ::Array{DataType,1}, ::Bool, ::Bool, ::Int64, ::Int64, ::Int64, ::Bool, ::Type{T}) at /Users/josh/.julia/v0.5/CSV/src/Source.jl:59
in (::Core.#kw#Type)(::Array{Any,1}, ::Type{CSV.Source}) at ./<missing>:0
in #Source#11(::UInt8, ::UInt8, ::UInt8, ::String, ::Int64, ::Int64, ::Array{DataType,1}, ::Bool, ::Bool, ::Base.Dates.DateFormat, ::Int64, ::Int64, ::Int64, ::Bool, ::Type{T}, ::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}) at /Users/josh/.julia/v0.5/CSV/src/Source.jl:25
in CSV.Source(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}) at /Users/josh/.julia/v0.5/CSV/src/Source.jl:21
Where /tmp/abc.txt
(unzipped) is
n
1
2
...
999
1000
Working on the uncompressed stream does work, though
julia> q = @from i in CSV.Source(open("/tmp/abc.txt", "r")) begin
@where i.n >= 500
@select {i.n}
end
julia> collect(q)
501-element Array{NamedTuples._NT_n{Nullable{Int64}},1}:
(n = 500)
(n = 501)
...
(n = 999)
(n = 1000)
Quick hack to get Zlib streaming working:
julia> using CSV, Query, Libz
julia> Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = eof(s) # the hack!
julia> io = Libz.ZlibInflateInputStream(open("/tmp/abc.txt.gz", "r"))
julia> q = @from i in CSV.Source(io) begin
@where i.n >= 500
@select {i.n}
end
julia> collect(q)
501-element Array{NamedTuples._NT_n{Nullable{Int64}},1}:
(n = 500)
(n = 501)
...
(n = 999)
(n = 1000)
Hi David, the latest results .
I will break fast and return to all your comments below the first two sentences, and to Tamas’ and Josh’s comments.
Hi David, Tamas, and Josh, thank you all for the information. What I’m set to do then is
using CSV, DataFrames, Query, Libz
# nb_available hack for streams (Josh correct me if I'm wrong about the reason for the hack)
Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = eof(s)
# Define stream I/O
io = Libz.ZlibInflateInputStream(open("/file.csv.gz", "r"))
# Define query or subset in I/O
q = @from i in CSV.Source(io) begin
@select i.Time_G_
@where i.Type == "Trade"
@select {i.Price, i.Volume}
end
# Define df and build DataFrame from I/O
df = DataFrame(take(q, 10_000_000))
I just need a little help from David to get @select i.Time_G_
right to have the column Time_G_
displayed next to columns Price and Volume.
Also, Josh, how long do you think this hack will last for? How do I build an insurance policy for it?
You want just one @select
statement in your query that picks all the columns you need:
q = @from i in CSV.Source(io) begin
@where i.Type == "Trade"
@select {i.Price, i.Volume, i.Time_G_}
end
where should I place @time
?
I had tried @select {i.Time_G_, i.Price, i.Volume}
and that didn’t work so I didn’t think of @select {i.Price, i.Volume, i.Time_G_}
. So the rule is place columns that apply to @where
first in the list, than any other column next?
Latest output.
No, I think something else is mixed up, this should work with any ordering of the columns in the @select
statement. Looking at your latest output, the CSV reading part seems to indicate that there are 0 rows, so you are running a query on a 0 rows data set. The problem seems to be somewhere upstream of the query, i.e. in the CSV reading part.
This might be telling
julia> q = @from i in CSV.Source(io) begin
@where i.Type == "Trade"
@select {i.Price, i.Volume}
end
ERROR: too many parameters for type _NT_
in getiterator(::CSV.Source) at /Users/Corvus/.julia/v0.5/IterableTables/src/integrations/datastreams.jl:46
in query(::CSV.Source) at /Users/Corvus/.julia/v0.5/Query/src/sources/source_iterable.jl:7
julia>
I think again that something is going wrong with the CSV reading here, before the query (although Query should give a more helpful error message). Can you paste here somewhere what the CSV.source(io)
expression returns?
Absolutely
julia> CSV.Source(io)
CSV.Source: __IO__
CSV.Options:
delim: ','
quotechar: '"'
escapechar: '\\'
null: ""
dateformat: Base.Dates.DateFormat(Base.Dates.Slot[Base.Dates.DelimitedSlot{Base.Dates.Year}(Base.Dates.Year,'y',4,"-"),Base.Dates.DelimitedSlot{Base.Dates.Month}(Base.Dates.Month,'m',2,"-"),Base.Dates.DelimitedSlot{Base.Dates.Day}(Base.Dates.Day,'d',2,r"(?=\s|$)")],"","english")
Data.Schema{true}:
rows: 0 cols: 1
Columns:
"" Nullable{WeakRefString{UInt8}}
julia>
Sorry, the nb_available
for the Libz stream should have been !eof
:
if !method_exists(nb_available, (BufferedStreams.BufferedInputStream{Libz.Source}, ))
Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = !eof(s)
end
It is only required to let CSV.Source
know (once, at the start) that it can start on the stream and that it isn’t empty or already exhausted.
The method_exists
check should make it a little bit more future-proof, in case an official method is later added.
Thanks Josh. Here’s the output.
David, where should I apply @time
in the query definition?
You’ll need to either fix the header in the CSV file to exclude “non-identifier” characters (i.e. Time[G] -> Time
), or use getfield
, e.g.
q = @from i in CSV.Source(io) begin
@where i.Type == "Trade"
@select {i.Price, i.Volume, getfield(i, Symbol("Time[G]")}
end
It’s actually Time_G_
, which should be parsed as one identifier string just like Price
and Volume
?
I tried your snippet with Time[G]
which didn’t work, and with Time_G_
which worked. In any case, I don’t understand the reason for this special treatment.
DataFrames is still not building.
Hmm - Look Ma, no rows!
What do you get from:
io = Libz.ZlibInflateInputStream(open("file.csv.gz", "r"))
readline(io)