Pipe to CSV.Source

I have a gzip-compressed CSV file in which uses commas for the decimal mark. I can uncompress and then use tr on the command line, but I would prefer doing that using a pipe from Julia. I tried

using CSV
using DataStreams
src1 = open(pipeline(`zcat /tmp/test.gz`, `tr , .`), "r") do io
    CSV.Source(io)
end

put the process appears to be idle (around 0% CPU), so I guess it is waiting for something, how can I fix this?

CSV.Source(io) just creates a Source object, but nothing will be read. Did you mean to use CSV.read?

Here is one way in pure Julia to translate the commas to dots in-stream using BufferedStreams:

using BufferedStreams

type Translator{T <: IO}
    io::T
    from::UInt8
    to::UInt8
end
Translator(io::IO; from::Char=',', to::Char='.') = Translator(io, UInt8(from), UInt8(to))

function BufferedStreams.readbytes!(source::Translator, buffer::Vector{UInt8}, from::Int, to::Int)
    v = view(buffer, from:to)
    n = readbytes!(source.io, v)
    v[v .== source.from] = source.to
    n
end
Base.eof(source::Translator) = eof(source.io)
Base.close(source::Translator) = close(source.io)
Base.nb_available{T <: Translator}(s::BufferedStreams.BufferedInputStream{T}) = !eof(s)

Then you can use a BufferedInputStream{Translator} object in place of an IO object, i.e.

using Libz
io = ZlibInflateInputStream(open("/tmp/test.gz", "r"))
t = BufferedInputStream(Translator(io; from=',', to='.'))
@assert isa(t, IO)
d = CSV.read(t)
2 Likes

Thanks. Now I am trying to get something like this working:

function read_into_db(csv_path, db, tablename, CSV_args)
    tmp = tempname()
    println("Uncompressing $(csv_path)")
    println("    into $(tmp) (may take a few minutes) ...")
    open(io->run(pipeline(pipeline(`zcat $(csv_path)`, `tr , .`), stdout=io)), tmp, "w")
    println("    filesize is $(round(filesize(tmp)/(2^30),2)) Gb")
    println("    ... forming CSV.Source")
    src = CSV.Source(tmp; CSV_args...)
    println("    ... loading into database")
    SQLite.load(db, tablename, src)
    println("    ... removing temporary file")
    rm(tmp)
end

The fix from @joshbode should help with uncompression and replacement. However, CSV.Source seems to insist on traversing the whole file first, so it is essentially a two-pass operation. Is there anything I can do about that, to make in single-pass?

In general, what is the best strategy for dealing with data that does not fit into memory in the current Julia ecosystem? If I read into an SQLite.jl database, can I dump the data somehow columnwise using Feather.jl (columns fit in memory)?

Yep - looks like you’re right - CSV.Source (Source.jl:59) will do a full read on the stream, regardless of whether the header and types are specified. The only input type CSV.Source won’t instantly consume is IOBuffer.