A pass-through option for IOCapture

I’ve been using the amazing IOCapture quite extensively, and I was considering abusing for a situation where I want to siphon off (tee, in unix jargon) the stdout data while also allowing it to be printed in the terminal or a notebook as normal.

See Add a `pass_through` option · Issue #19 · JuliaDocs/IOCapture.jl · GitHub

Are there some IO experts with tips for how to go about this? I haven’t started playing around with this, but I’m guessing something in line 108:

buffer_redirect_task = @async write(output, pipe)

I don’t think

buffer_redirect_task = @async (write(output, pipe) && write(default_stdout, pipe)

will work, because pipe can probably only be consumed once, right?

Are there other low-level considerations I should be aware of while trying to implement this?

A requirement would be that the normal stdout should be as undisturbed as possible. For example, running in a Jupyter notebook, it should be printed continuously as a cell runs.

What I ultimately want to do is to have some long-running optimization (on the order of days) that produces significant convergence information output (possibly a couple of hundred MB worth of text).

This calculation is wrapped here:

I’d like to wrap the highlighted line in something that captures the first and the last N bytes of whatever the optimize function prints to stdout/stderr. That data would then be stored in the dumped .jld file alongside the "result". When I run the optimize_or_load function again (where the .jld file with the result already exists), it could not just return the cached result, but also print an abbreviated

<beginning of stdout>
…
<end of stdout>

to have some record of what convergence data was printed during the optimzation. The beginning and end of the output are the most interesting, I don’t want to capture the hundred MB of full output in the .jld.

It seemed like wrapping the highlighted line in IOCapture would work great for this, except that I’d actually like the normal stdout to be undisturbed, hence this question.

The other part is that I only want to capture the beginning and end of the data, but that seems more straightforward: I’d probably just have to replace the output = IOBuffer() in line 107 with a custom IOBuffer-like object that discards the data I don’t want to keep.

Are there any other solutions or packages that might be of use? The basic functionality I would need is to split (tee) the standard stdout/stderr into a custom IOBuffer-like object while leaving it unaffected otherwise.

Oh, and I have seen Write to file and stdout, which might also be useful. It just seemed like IOCapture is a lot more polished already and might easier to adapt.

1 Like

I came up with the start of a possible implementation at Allow pass-through of output by goerz · Pull Request #20 · JuliaDocs/IOCapture.jl · GitHub

The core of it is this snippet:

output = IOBuffer()
temp = IOBuffer()
buffer_redirect_task = @async begin
    write(temp, pipe)
    temp_data = take!(temp)
    write(output, temp_data)
    write(default_stdout, temp_data)
end

See also my comment at Add a `tee` option · Issue #19 · JuliaDocs/IOCapture.jl · GitHub

If there’s any low-level IO expert who could weigh in on whether this is a sane approach, please leave a comment on the issue or PR.


I also played around a little with the Tee struct from Write to file and stdout - #3 by Tamas_Papp, but it doesn’t seem like redirect_stdout can be called with a Tee object. If someone knows how to define the appropriate method of redirect_stdout to make that work, that would still be a possible alternative.

1 Like

I should have tested that with a long-running function. The above doesn’t write any output until the code block finishes. Took me a while to understand, but write(temp, pipe) blocks until there’s no more data to be read. This one kinda works:

output = IOBuffer()
bufsize = 128  # Arbitrary. What's a good value?
buffer_redirect_task = @async begin
    while true
        buffer = read(pipe, bufsize)
        write(output, buffer)
        write(default_stdout, buffer)
        isopen(pipe) || break
    end
end

It might be nice if there was a way to get rid of the allocation for buffer. Does anyone have any idea for how to use a preallocated buffer? I’ve tried playing around with bytesavailable and read!, but that proved quite tricky. It would be nice if there was an atomic bytes_read = read!(pip, buffer, bufsize), but I don’t think there is.

1 Like

There is copyuntil and copyline but they will land in 1.11:

  copyuntil(out::IO, stream::IO, delim; keep::Bool = false)
  copyuntil(out::IO, filename::AbstractString, delim; keep::Bool = false)

  Copy a string from an I/O stream or a file, up to the given delimiter, to the out stream, returning out. The delimiter can be a UInt8, AbstractChar, string, or vector.
  Keyword argument keep controls whether the delimiter is included in the result. The text is assumed to be encoded in UTF-8.

  Similar to readuntil, which returns a String; in contrast, copyuntil writes directly to out, without allocating a string. (This can be used, for example, to read data
  into a pre-allocated IOBuffer.)
  copyline(out::IO, io::IO=stdin; keep::Bool=false)
  copyline(out::IO, filename::AbstractString; keep::Bool=false)

  Copy a single line of text from an I/O stream or a file to the out stream, returning out.

  When reading from a file, the text is assumed to be encoded in UTF-8. Lines in the input end with '\n' or "\r\n" or the end of an input stream. When keep is false (as it
  is by default), these trailing newline characters are removed from the line before it is returned. When keep is true, they are returned as part of the line.

  Similar to readline, which returns a String; in contrast, copyline writes directly to out, without allocating a string. (This can be used, for example, to read data into
  a pre-allocated IOBuffer.)

The other option would be using unsafe_write.

2 Likes

Interesting! I’m assuming copyline/copyuntil can manage the stream ending before it reaches delim (although in my case, delim would be "\n" and the stream should, in fact, end with a newline).

Can you elaborate on unsafe_write? Based on its docstring, it seems I would have to know how many nbytes I can read from the ref stream. How does it handle the stream being closed with less than nbytes left?

I was playing around with bytesavailable(pipe) before (which intuitively would be how one would get a safe value for nbytes), but it seemed to return some nonsensical numbers. I presume that was because of the @async, where the available data in the pipe changes between a call to bytesavailable and a subsequent read/write.

P.S.: It looks like copyuntil is a reasonably small function, so it might be an option to just copy that in for pre-1.11 support.

1 Like

unsafe_write has no safety against edge cases, it just loads a pointer and puts data into it, everything else should be taken care by user. One thing I forgot(my bad), it does still allocate a little, you can make it to not allocate by this but there are some caveats.

copyline/copyuntil should handle edge cases though. I don’t think you can copy the functions since they use low-level C function. @stevengj might be able to help you with better options.

Yeah, unsafe_write is probably too unsafe, especially while I don’t understand every detail of asynchronous I/O. Also copyuntil doesn’t seem to work with Pipe/PipeEndpoint objects.

It would be nice if there was an atomic bytes_read = read!(pip, buffer, bufsize) , but I don’t think there is.

Actually, looking at the internals, with @edit read(pipe, bufsize), I think there is, and it’s readbytes!.

So this is my latest iteration, which seems to work and shouldn’t allocate anything inside the loop:

bufsize = 128
buffer = Vector{UInt8}(undef, bufsize)
buffer_redirect_task = @async begin
    while true
        nbytes = readbytes!(pipe, buffer, bufsize)
        data = view(buffer, 1:nbytes)
        write(output, data)
        write(default_stdout, data)
        isopen(pipe) || break
    end
end

Now, one more question about the exit condition from the loop (since I’m still pretty unsure about some of the details of asynchronous I/O): Is there any possibility that I might be missing data because the main task puts data into the pipe between the time I call readbytes!(pipe, buffer, bufsize) and the time I call isopen(pipe)? What about if I move the isopen check to the beginning of the loop (while isopen(pipe))?

The other possibility I was considering was (nbytes == 0) && break, which also seems to work in my tests. But again: is there any possibility that the main task doesn’t write data to the pipe fast enough, so that there (temporarily) might not be anything for readbytes! to read, making me believe erroneously that the main task is done? Update: Let’s split this off to Race condition reading from Pipe. The MWE there demonstrated that (nbytes == 0) does not work in general.

I saw iolock_begin() and iolock_end() in the internals of Pipe. Is that something I might need here to avoid race conditions?

GitHub - JuliaPackaging/OutputCollectors.jl: Capture subprocess stdout and stderr streams independently, resynthesizing and colorizing the streams appropriately has the ability to run a process and write its output to both screen (colouring differently stdout and stderr) and to a file at the same time.

2 Likes

There’s a PR ready for review now, if anyone has any further input: Allow pass-through of output by goerz · Pull Request #20 · JuliaDocs/IOCapture.jl · GitHub

Thanks everyone, and especially @Sukera for solving the issue with the race condition in Race condition reading from Pipe!

1 Like