I have a gzip-compressed CSV
file in which uses commas for the decimal mark. I can uncompress and then use tr
on the command line, but I would prefer doing that using a pipe from Julia. I tried
using CSV
using DataStreams
src1 = open(pipeline(`zcat /tmp/test.gz`, `tr , .`), "r") do io
CSV.Source(io)
end
put the process appears to be idle (around 0%
CPU), so I guess it is waiting for something, how can I fix this?
CSV.Source(io)
just creates a Source
object, but nothing will be read. Did you mean to use CSV.read
?
Here is one way in pure Julia to translate the commas to dots in-stream using BufferedStreams
:
using BufferedStreams
type Translator{T <: IO}
io::T
from::UInt8
to::UInt8
end
Translator(io::IO; from::Char=',', to::Char='.') = Translator(io, UInt8(from), UInt8(to))
function BufferedStreams.readbytes!(source::Translator, buffer::Vector{UInt8}, from::Int, to::Int)
v = view(buffer, from:to)
n = readbytes!(source.io, v)
v[v .== source.from] = source.to
n
end
Base.eof(source::Translator) = eof(source.io)
Base.close(source::Translator) = close(source.io)
Base.nb_available{T <: Translator}(s::BufferedStreams.BufferedInputStream{T}) = !eof(s)
Then you can use a BufferedInputStream{Translator}
object in place of an IO
object, i.e.
using Libz
io = ZlibInflateInputStream(open("/tmp/test.gz", "r"))
t = BufferedInputStream(Translator(io; from=',', to='.'))
@assert isa(t, IO)
d = CSV.read(t)
2 Likes
Thanks. Now I am trying to get something like this working:
function read_into_db(csv_path, db, tablename, CSV_args)
tmp = tempname()
println("Uncompressing $(csv_path)")
println(" into $(tmp) (may take a few minutes) ...")
open(io->run(pipeline(pipeline(`zcat $(csv_path)`, `tr , .`), stdout=io)), tmp, "w")
println(" filesize is $(round(filesize(tmp)/(2^30),2)) Gb")
println(" ... forming CSV.Source")
src = CSV.Source(tmp; CSV_args...)
println(" ... loading into database")
SQLite.load(db, tablename, src)
println(" ... removing temporary file")
rm(tmp)
end
The fix from @joshbode should help with uncompression and replacement. However, CSV.Source
seems to insist on traversing the whole file first, so it is essentially a two-pass operation. Is there anything I can do about that, to make in single-pass?
In general, what is the best strategy for dealing with data that does not fit into memory in the current Julia ecosystem? If I read into an SQLite.jl
database, can I dump the data somehow columnwise using Feather.jl
(columns fit in memory)?
Yep - looks like you’re right - CSV.Source
(Source.jl:59) will do a full read
on the stream, regardless of whether the header and types are specified. The only input type CSV.Source
won’t instantly consume is IOBuffer
.