Here are some examples you might find useful.
First let’s create a text file to be processed by our pipeline:
write("data.txt", repr("text/plain", rand(Int8, 10, 6)))
This makes a file with lines such as:
3×6 Array{Int8,2}:
94 -100 3 -27 -97 22
-10 -18 -10 -43 123 28
82 -58 -80 -85 -65 57
We will use Julia to skip the header (first line) and calculate the sum of each line of numbers, then the external programs sort
and head
to sort the lines and keep the first two, then Julia to show the result with line numbers.
Let’s define func1
and func2
:
# Calculate sum of numbers in each line of given file (skipping first line), and return the sums as a multi-line string
function func1(file)
sums = [sum(parse.(Int, split(line))) for line in readlines(file)[2:end]]
return join(sums, '\n')
end
# Print output of given stream with added line numbers
function func2(io)
for (i, line) in enumerate(eachline(io))
println("line $i: $line")
end
end
# Run the pipeline. The string result from `func1` is wrapped in an IO stream to work as a pipeline source.
pipeline(IOBuffer(func1("data.txt")), `sort -g`, `head -n 2`) |> func2
This can be improved by letting func1
output directly to an IO stream, which it can give back as return value. I also prefer to use an iterator for the file lines rather than read the whole file before processing:
function func1b(file)
io = PipeBuffer()
for line in Iterators.drop(eachline(file), 1)
v = parse.(Int, split(line))
println(io, sum(v))
end
return io
end
Now we don’t need to wrap the output of func1:
pipeline(func1b("data.txt"), `sort -g`, `head -n 2`) |> func2
The syntax is nice, but it might be a poor solution for working with large files: func1
/func1b
must process the whole file before passing anything to the pipeline.
Normally this would be solved by writing directly to the process, using do
to close it automatically when finished:
function func1c(file, io)
for line in Iterators.drop(eachline(file), 1)
v = parse.(Int, split(line))
println(io, sum(v))
end
end
# Write to standard output (no post-processing in Julia)
open(pipeline(`sort -g`, `head -n 2`), stdout, write=true) do io
func1c("data.txt", io)
end
However, we want Julia to write to the pipeline and read from it, more or less at the same time. A solution is to run the writer function in a separate task:
# A func1 version that closes the stream when finished, to allow the process to terminate
function func1d(file, io)
for line in Iterators.drop(eachline(file), 1)
v = parse.(Int, split(line))
println(io, sum(v))
sleep(0.1) # Add artificial delay in generation of input to the pipeline
end
close(io)
end
# Start pipeline asynchronously
cmd = open(pipeline(`sort -g`, `head -n 2`), read=true, write=true)
# Start asynchronous writer, passing only the "write end" of the pipe to avoid closing the reader end before the pipeline is finished
writer = @async func1d("data.txt", Base.pipe_writer(cmd))
# Process the pipeline output
func2(cmd)
# Wait for the writer task to finish
wait(writer)
This way the write and read operations will be interleaved as we progress through a large file.
To make this visible though we should change the pipeline commands, because sort
consumes the whole input before starting to generate output. Replacing the pipeline with cat
shows that we are indeed processing the stream as it is generated:
cmd = open(`cat`, read=true, write=true)
writer = @async func1d("data.txt", Base.pipe_writer(cmd))
func2(cmd)
wait(writer)
Edit: I just found out about Base.BufferStream
. This should allow to implement proper stream processing with a nice syntax…