Piping into an external program from Julia output

@nstgc thanks for the kind words! Indeed BufferStream is unexported and undocumented, I only found it by chance (mentioned in an issue on GitHub). It’s a bit like a PipeBuffer: reading after a write will return what was written. But reading from a BufferStream blocks until data is available, so it can be used to feed data to the pipeline.

Here’s an example using BufferStream:

# Write to io the sum of numbers in each line of the given file (skipping the first line)
function func1e(io, file)
  for line in Iterators.drop(eachline(file), 1)
    v = parse.(Int, split(line))
    println(io, sum(v))
    sleep(0.1) # Add artificial delay in generation of input to the pipeline
  end
end

# Print output of given stream with added line numbers
function func2(io)
  for (i, line) in enumerate(eachline(io))
    println("line $i: $line")
  end
end

# Call f asynchronously with a BufferStream and the given arguments and return
# the BufferStream, which will be closed automatically when f is done.
function async_writer(f, args...)
  io = Base.BufferStream()
  @async try # We don't wait on the task so we need `try` to see the error (if any)
    f(io, args...)
  catch e
    @error e
  finally
    close(io)
  end
  return io
end

pipeline(async_writer(func1e, "data.txt"), `sort -g`, `head -n 2`) |> func2

This looks nice I think, and the async_writer function can be reused to wrap other writer functions.

Again, replace sort -g, head -n 2 with cat to see that the stream is processed as it is produced.

Note the try block, which fixes an issue that was already present in the func1d version (try running the func1d example with a wrong filename: it will hang silently).

I think this new version still has an issue though: as I understand, if func1e writes faster than the pipeline can consume, the internal BufferStream buffer will grow without bound. It would be nice to have it block when reaching a certain size. Maybe this functionality could be added to BufferStream.

1 Like