Race condition reading from Pipe

Splitting this off from A `tee` option for `IOCapture`

Does the following minimal working example for asynchronously reading data from a Pipe involve a race condition?

using Random

function main()

    pipe = Pipe()
    Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)

    bufsize = 128
    buffer = Vector{UInt8}(undef, bufsize)
    buffer_redirect_task = @async begin
        while isopen(pipe)
            nbytes = readbytes!(pipe, buffer, bufsize)
            data = view(buffer, 1:nbytes)
            write(stdout, data)
        end
    end

    io = pipe.in
    for i = 1:10000
        write(pipe.in, randstring(64))
    end
    write(pipe.in, "[END OF STREAM]")
    close(pipe.in)

end

main()

Running the example, I haven’t been able to demonstrate one, in that I’m always seeing [END OF STREAM] printed to stdout. But my gut feeling would be that the main task might write data to the pipe and close it after buffer_redirect_task reads the data. Then, the isopen check would end the loop, losing the last data that was written.

Is there an asynchronous I/O expert who could confirm whether there is indeed a race condition here? If yes: How can I rewrite the example so that it is guaranteed to write all the data from the pipe to stdout? If not: why is this safe?

As suggested by @Sukera on Slack, the correct solution should be to replace isopen(pipe) with !eof(pipe)

1 Like