Making a simple wrapper for an IO stream

Here is an attempt to satisfy both of these, but it is super hacky. I am especially disappointed by the need to implement Base.setup_stdio to get the pipeline to work. I think the problem is that there is no place in the IO type hierarchy to attach a new type that is not backed by a file handle. I wish there was an AbstractIOBuffer between IO and GenericIOBuffer

mutable struct MeasuredStream <: IO
    io::IO
    nread::Int
    nwritten::Int

    MeasuredStream(io) = new(io, 0, 0)
end

Base.eof(s::MeasuredStream) = Base.eof(s.io)

function Base.readavailable(s::MeasuredStream)
    x = readavailable(s.io)
    s.nread += length(x)
end

function Base.unsafe_read(s::MeasuredStream, p::Ptr{UInt8}, nb::UInt)
    s.nread += nb
    return Base.unsafe_read(s.io, ref, nb)
end

function Base.unsafe_write(s::MeasuredStream, p::Ptr{UInt8}, nb::UInt)
    s.nwritten += nb
    return Base.unsafe_write(s.io, p, nb)
end

function Base.setup_stdio(stdio::MeasuredStream, child_readable::Bool)
    parent = Base.PipeEndpoint()
    rd, wr = Base.link_pipe(!child_readable, child_readable)
    try
        Base.open_pipe!(parent, child_readable ? wr : rd)
    catch ex
        Base.close_pipe_sync(rd)
        Base.close_pipe_sync(wr)
        rethrow(ex)
    end
    child = child_readable ? rd : wr
    try
        let in = (child_readable ? parent : stdio),
            out = (child_readable ? stdio : parent)
            @async try
                write(in, out)
            catch ex
                @warn "Process error" exception=(ex, catch_backtrace())
            finally
                close(parent)
            end
        end
    catch ex
        close_pipe_sync(child)
        rethrow(ex)
    end
    return (child, true)
end

function compress_stream(input_stream, output_file)
    pipeline(input_stream, `gzip`, output_file) |> run
end

function readtest()
    data = IOBuffer(repeat("hello", 1000))
    stream = MeasuredStream(data)
    compress_stream(stream, "hellos.gz")
    print("Input size: $(stream.nread)")
end

function writetest()
    data = IOBuffer(UInt8[], read=true, write=true)
    stream = MeasuredStream(data)
    write(stream,b"asdf")
    print("Input size: $(stream.nwritten)")
end

2 Likes