All I can think of is that you need to make sure io hasn’t already been used before running the query, otherwise it will have been exhausted (or missing some rows, at least, including the header) - i.e. don’t rerun the query q without reopening io.
I’m not sure, as I said, this looks like a problem with reading the CSV file, not with the query part of things. If you look at the pvt3 output you can see a line that says rows: 0 cols: 1. That info comes from the CSV.jl package and means that the csv reading code for some reason sees 0 rows and only one column in the file. pvt2 on the other hand doesn’t have that problem, so you are changing something in the way you read the CSV file there, all before the query part.
Umm…not exactly sure. What’s the csv file in question? Can it be posted/attached here so I can take a look? Along with the CSV.Source or CSV.read command being used?
You are running into the same problem that @joshbode described already:
You can’t call CSV.Source twice on the same io stream. I’m not sure whether the stream from libz can somehow be reset after you’ve read from it once, or whether you’ll just have to open a new stream for the second call to CSV.Source.
julia> CSV.reset!(CSV.Source(io))
julia> df = DataFrame(take(query(), 10_000_000))
ERROR: too many parameters for type _NT_
in getiterator(::CSV.Source) at /Users/Corvus/.julia/v0.5/IterableTables/src/integrations/datastreams.jl:46
in query(::CSV.Source) at /Users/Corvus/.julia/v0.5/Query/src/sources/source_iterable.jl:7
in query() at ./REPL[4]:2
julia>
You would need to reset the io stream, not the CSV.Source instance. I think seekstart(io) before you call CSV.Source(io) might do the trick, if libz streams support that.
It is very clear what is going on here: you are reading repeatedly from the io stream without calling seekstart on the stream. The example you just posted is another case of that: You can’t call CSV.Source(io) followed by CSV.read(io) without calling seekstart(io) inbetween.
If seekstart is not supported by libz you will have to reopen the stream once you have read from it via CSV.Source or CSV.read or anything like that.
I couldn’t find it in Libz.jl but does this mean Libz.jl still supports it?
help?> seekstart
search: seekstart
seekstart(s)
Seek a stream to its beginning.
julia>
In any case, this is the sequence
using CSV, DataFrames, Query, Libz
# nb_available hack for streams (Josh correct me if I'm wrong about the reason for the hack)
if !method_exists(nb_available, (BufferedStreams.BufferedInputStream{Libz.Source}, ))
Base.nb_available{T <: Libz.Source}(s::BufferedStreams.BufferedInputStream{T}) = !eof(s)
end
# Define stream I/O
io = Libz.ZlibInflateInputStream(open("/file.csv.gz", "r"))
# Define query and subset in I/O
function query()
q = @from i in CSV.Source(io) begin
@where i.Type == "Trade"
@select {i.Price, i.Volume, i.Time_G_} # pvt2
end
end
@time query()
# Define df and build DataFrame from I/O
df = DataFrame(take(query(), 10_000_000))
How would I define a seekstart(io) call before the second call to CSV.Source(io)?
Oh I got this as well, does that mean Libz does not support it?
julia> seekstart(io)
ERROR: ArgumentError: Can't seek in input stream with source of type Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}
in seek(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}, ::Int64) at /Users/Corvus/.julia/v0.5/BufferedStreams/src/bufferedinputstream.jl:323
in seekstart(::BufferedStreams.BufferedInputStream{Libz.Source{:inflate,BufferedStreams.BufferedInputStream{IOStream}}}) at ./iostream.jl:54
julia>
function query(filename)
file = open(filename, "r")
io = Libz.ZlibInflateInputStream(file)
csv = CSV.Source(io)
q = @from i in csv begin
@where i.Type == "Trade"
@select {i.Price, i.Volume, i.Time_G_}
end
df = DataFrame(take(query(), 10_000_000))
return df
end
Note a couple of things:
If you use the @time macro make sure you run the function once before you time it, otherwise you are just timing the compiler.
This function is now opening lots of things that should be closed after use (file stream, probably libz stream, and I think the CSV.Source instance). Ideally you would use try ... finally blocks to make sure these resources always get freed. Something like this:
function query(filename)
file = open(filename, "r")
try
io = Libz.ZlibInflateInputStream(file)
try
csv = CSV.Source(io)
try
q = @from i in csv begin
@where i.Type == "Trade"
@select {i.Price, i.Volume, i.Time_G_}
end
df = DataFrame(take(query(), 10_000_000))
return df
finally
Data.close!(csv)
end
finally
close(io)
end
finally
close(file)
end
end