@nstgc thanks for the kind words! Indeed BufferStream
is unexported and undocumented, I only found it by chance (mentioned in an issue on GitHub). It’s a bit like a PipeBuffer
: reading after a write will return what was written. But reading from a BufferStream
blocks until data is available, so it can be used to feed data to the pipeline.
Here’s an example using BufferStream
:
# Write to io the sum of numbers in each line of the given file (skipping the first line)
function func1e(io, file)
for line in Iterators.drop(eachline(file), 1)
v = parse.(Int, split(line))
println(io, sum(v))
sleep(0.1) # Add artificial delay in generation of input to the pipeline
end
end
# Print output of given stream with added line numbers
function func2(io)
for (i, line) in enumerate(eachline(io))
println("line $i: $line")
end
end
# Call f asynchronously with a BufferStream and the given arguments and return
# the BufferStream, which will be closed automatically when f is done.
function async_writer(f, args...)
io = Base.BufferStream()
@async try # We don't wait on the task so we need `try` to see the error (if any)
f(io, args...)
catch e
@error e
finally
close(io)
end
return io
end
pipeline(async_writer(func1e, "data.txt"), `sort -g`, `head -n 2`) |> func2
This looks nice I think, and the async_writer
function can be reused to wrap other writer functions.
Again, replace sort -g
, head -n 2
with cat
to see that the stream is processed as it is produced.
Note the try
block, which fixes an issue that was already present in the func1d
version (try running the func1d
example with a wrong filename: it will hang silently).
I think this new version still has an issue though: as I understand, if func1e
writes faster than the pipeline can consume, the internal BufferStream buffer will grow without bound. It would be nice to have it block when reaching a certain size. Maybe this functionality could be added to BufferStream.