Making a simple wrapper for an IO stream

I want to make a type that wraps an IO stream and measures how much data is read through (or written to) it. Something like this:

mutable struct MeasuredStream <: IO
  io::IO
  nread::Int
  nwritten::Int

  MeasuredStream(io) = new(io, 0, 0)
end

function Base.read(s::MeasuredStream, nb=typemax(Int))
  x = read(s.io, nb)
  s.nread += length(x)
  return x
end

function Base.write(s::MeasuredStream, x)
  n = write(s.io, x)
  s.nwritten += n
  return n
end

to be used like this:

function compress_stream(input_stream, output_file)
  pipeline(input_stream, `gzip`, output_file) |> run
end

data = IOBuffer(repeat("hello", 1000));
stream = MeasuredStream(data)
compress_stream(stream, "hellos.gz")
print("Input size: $(stream.nread)")

However this fails for a (to me) obscure reason:

ERROR: MethodError: no method matching rawhandle(::MeasuredStream)
Closest candidates are:
  rawhandle(::RawFD) at cmd.jl:161
  rawhandle(::Base.DevNull) at cmd.jl:160
  rawhandle(::Base.Filesystem.File) at filesystem.jl:75
  ...
Stacktrace:
 [1] _spawn_primitive(::String, ::Cmd, ::Array{Any,1}) at ./process.jl:77
 [2] #585 at ./process.jl:112 [inlined]
 [3] setup_stdios(::Base.var"#585#586"{Cmd}, ::Array{Any,1}) at ./process.jl:196
 [4] _spawn at ./process.jl:111 [inlined]
 [5] _spawn(::Base.CmdRedirect, ::Array{Any,1}) at ./process.jl:139 (repeats 2 times)
 [6] run(::Base.CmdRedirect; wait::Bool) at ./process.jl:439
 [7] run at ./process.jl:438 [inlined]
 [8] |> at ./operators.jl:834 [inlined]
 [9] compress_stream(::MeasuredStream, ::String) at ./REPL[4]:2

I tried forwarding rawhandle without success:

julia> Base.rawhandle(s::MeasuredStream) = Base.rawhandle(s.io)

julia> compress_stream(stream, "hellos.gz")
ERROR: MethodError: no method matching rawhandle(::Base.GenericIOBuffer{Array{UInt8,1}})

I guess I need to implement a bunch of IO methods but my random trials failed and I couldn’t find any guidance.

How can I properly wrap an IO stream to add custom functionality?

I would also like to know what a solution could look like. The following unfortunately leads to ambiguity errors:

mutable struct MeasuredStream{T} <: IO
  io::T
  nread::Int
  nwritten::Int
  MeasuredStream(io) = new{typeof(io)}(io, 0, 0)
end


function Base.read(s::MeasuredStream, nb=typemax(Int))
  x = read(s.io, nb)
  s.nread += length(x)
  return x
end

function Base.write(s::MeasuredStream, x)
  n = write(s.io, x)
  s.nwritten += n
  return n
end

data = IOBuffer(UInt8[], read=true, write=true);
stream = MeasuredStream(data)
write(stream,b"asdf")
ERROR: MethodError: write(::MeasuredStream{Base.GenericIOBuffer{Array{UInt8,1}}}, ::Base.CodeUnits{UInt8,String}) is ambiguous. Candidates:
  write(io::IO, s::Base.CodeUnits) in Base at strings/basic.jl:729
  write(s::MeasuredStream, x) in Main at REPL[3]:1
  write(s::IO, A::AbstractArray) in Base at io.jl:632
Possible fix, define
  write(::MeasuredStream, ::Base.CodeUnits)

I suspect the answer is you can’t do that with pipeline(). It appears to be using OS features to pipe commands together rather than having Julia in the middle of the pipeline.

1 Like

Here is an attempt to satisfy both of these, but it is super hacky. I am especially disappointed by the need to implement Base.setup_stdio to get the pipeline to work. I think the problem is that there is no place in the IO type hierarchy to attach a new type that is not backed by a file handle. I wish there was an AbstractIOBuffer between IO and GenericIOBuffer

mutable struct MeasuredStream <: IO
    io::IO
    nread::Int
    nwritten::Int

    MeasuredStream(io) = new(io, 0, 0)
end

Base.eof(s::MeasuredStream) = Base.eof(s.io)

function Base.readavailable(s::MeasuredStream)
    x = readavailable(s.io)
    s.nread += length(x)
end

function Base.unsafe_read(s::MeasuredStream, p::Ptr{UInt8}, nb::UInt)
    s.nread += nb
    return Base.unsafe_read(s.io, ref, nb)
end

function Base.unsafe_write(s::MeasuredStream, p::Ptr{UInt8}, nb::UInt)
    s.nwritten += nb
    return Base.unsafe_write(s.io, p, nb)
end

function Base.setup_stdio(stdio::MeasuredStream, child_readable::Bool)
    parent = Base.PipeEndpoint()
    rd, wr = Base.link_pipe(!child_readable, child_readable)
    try
        Base.open_pipe!(parent, child_readable ? wr : rd)
    catch ex
        Base.close_pipe_sync(rd)
        Base.close_pipe_sync(wr)
        rethrow(ex)
    end
    child = child_readable ? rd : wr
    try
        let in = (child_readable ? parent : stdio),
            out = (child_readable ? stdio : parent)
            @async try
                write(in, out)
            catch ex
                @warn "Process error" exception=(ex, catch_backtrace())
            finally
                close(parent)
            end
        end
    catch ex
        close_pipe_sync(child)
        rethrow(ex)
    end
    return (child, true)
end

function compress_stream(input_stream, output_file)
    pipeline(input_stream, `gzip`, output_file) |> run
end

function readtest()
    data = IOBuffer(repeat("hello", 1000))
    stream = MeasuredStream(data)
    compress_stream(stream, "hellos.gz")
    print("Input size: $(stream.nread)")
end

function writetest()
    data = IOBuffer(UInt8[], read=true, write=true)
    stream = MeasuredStream(data)
    write(stream,b"asdf")
    print("Input size: $(stream.nwritten)")
end

2 Likes

@contradict Thanks for confirming my suspicion that there is no straightforward way.

I am curious, is adding an intermediary abstract type generally a non-breaking change? If yes, could the language allow inserting an intermediary abstract type by a third party (e.g. by an external library)? EDIT: I figured this would be functionally equivalent to inheriting from a concrete type, so the answer to the second question is probably no

Thanks @contradict! So indeed the ugly part is the need to implement Base.setup_stdio. I’m also curious if adding an intermediate abstract for GenericIOBuffer would be a breaking change…

No, adding intermediate types is not usually considered breaking IIRC; I think we’ve done that in other cases already in the 1.x cycle?

2 Likes

Is there any reason why a method like the one you wrote couldn’t simply be the fallback method for any ::IO type? That would be an even easier change than adding new type…

There already is a fallback method for ::IO that assumes an underlying file handle. All the methods are defined here. The problem in this case is that we want to do exactly the same thing as setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool). The existence of that union makes me think this is already a problem in Base.

Maybe this is worth a deeper look, is there somebody I can check with who could comment on the likelihood of a patch to fix this being accepted? Assuming that it turns out to not break too many things that is.

2 Likes

Yeah, good observation about that method probably being a better fallback. Anyone who knew to implement rawhandle probably needed to (or trivially can) set up setup_stdio also. Care to submit a PR?

1 Like