Here is an attempt to satisfy both of these, but it is super hacky. I am especially disappointed by the need to implement Base.setup_stdio
to get the pipeline to work. I think the problem is that there is no place in the IO type hierarchy to attach a new type that is not backed by a file handle. I wish there was an AbstractIOBuffer
between IO
and GenericIOBuffer
mutable struct MeasuredStream <: IO
io::IO
nread::Int
nwritten::Int
MeasuredStream(io) = new(io, 0, 0)
end
Base.eof(s::MeasuredStream) = Base.eof(s.io)
function Base.readavailable(s::MeasuredStream)
x = readavailable(s.io)
s.nread += length(x)
end
function Base.unsafe_read(s::MeasuredStream, p::Ptr{UInt8}, nb::UInt)
s.nread += nb
return Base.unsafe_read(s.io, ref, nb)
end
function Base.unsafe_write(s::MeasuredStream, p::Ptr{UInt8}, nb::UInt)
s.nwritten += nb
return Base.unsafe_write(s.io, p, nb)
end
function Base.setup_stdio(stdio::MeasuredStream, child_readable::Bool)
parent = Base.PipeEndpoint()
rd, wr = Base.link_pipe(!child_readable, child_readable)
try
Base.open_pipe!(parent, child_readable ? wr : rd)
catch ex
Base.close_pipe_sync(rd)
Base.close_pipe_sync(wr)
rethrow(ex)
end
child = child_readable ? rd : wr
try
let in = (child_readable ? parent : stdio),
out = (child_readable ? stdio : parent)
@async try
write(in, out)
catch ex
@warn "Process error" exception=(ex, catch_backtrace())
finally
close(parent)
end
end
catch ex
close_pipe_sync(child)
rethrow(ex)
end
return (child, true)
end
function compress_stream(input_stream, output_file)
pipeline(input_stream, `gzip`, output_file) |> run
end
function readtest()
data = IOBuffer(repeat("hello", 1000))
stream = MeasuredStream(data)
compress_stream(stream, "hellos.gz")
print("Input size: $(stream.nread)")
end
function writetest()
data = IOBuffer(UInt8[], read=true, write=true)
stream = MeasuredStream(data)
write(stream,b"asdf")
print("Input size: $(stream.nwritten)")
end