Piping into an external program from Julia output

What I’m trying to do is process a bunch of files with a Julia function, which then is fed into an external program, and fed again into a Julia function.

So basically

func1(file) | run(`CMD`) |func2(stdin)

which made me try

run(pipeline(func1(file),`CMD`,func2(stdin)))

. This does not work. Google led me to the suggestion

open(pipeline(`cat`, `sort`), "w", STDOUT) do f
           println(f, "hello")
       end

where STDOUT was appropriate at the time of authorship. This would imply that the first step I’m looking for is

open(`CMD`,"w",io) do stdin
    func1(file)
end
func2(io)

This that correct? Also, is there a more eloquent way of doing that? As a toy problem, I hoped

println("hello") |> run(`rev`)

would have worked, but it just prints “hello” and then rev seems to wait for something.

This led me to trying

function rev()
       open(`rev`,"w",stdout) do stdin
               print(stdin)
       end
end
print("hello") |> rev

.

I guess what I’m having trouble with is that there isn’t a nice way of feeding stdin to an external program. It seems like it would actually be easier for me to just have func1 write to a temp file then either

pipeline(`cat $file`, `CMD`)

or have the external program open it itself. This just seems ineloquent to, that I should be able to accomplish this with a single stream.

I have read Running External Programs · The Julia Language as well as Networking and Streams · The Julia Language. I am at least proficient enough with Julia to write a git filter with it, so can feed an entire Julia script stdin from outside the script, but not from stdin originating from inside the script.

1 Like

After reading the document, I think you are just missing the run command, something like this: run(pipeline(func1(file),CMD,func2(stdin)))

https://docs.julialang.org/en/v1/manual/running-external-programs/

No. I should have been clearer. I was using the run command.

run(pipeline(println("hello"),`rev`))
hello
ERROR: MethodError: no method matching pipeline(::Nothing, ::Cmd)
Closest candidates are:
  pipeline(::Any, ::Any, ::Any, ::Any...) at cmd.jl:314
  pipeline(::Base.AbstractCmd, ::Any) at cmd.jl:293
  pipeline(::Union{RawFD, Base.FileRedirect, AbstractString, IO}, ::Base.AbstractCmd) at cmd.jl:294
Stacktrace:
 [1] top-level scope at REPL[143]:1

It seems Julia doesn’t like native functions in pipes/run.

That is correct; pipeline only takes commands (like in shell commands) as arguments.

julia> run(pipeline(`julia -e 'print("hello")'`,`grep el`))
hello

So what would you suggest?

Since starting this thread I’ve also tried (as a sort of shot in the dark)

cmd_stream = open(`cmd`,"w+")
write(cmd_stream,"a string")
read(cmd_stream)

and a variety of other similar types of things. If it’s an IO stream then you should be able to read and write to it. However, if I remember correctly, nothing is actually written until you’ve closed the “file”, which in this case is a command so I’m not sure how you’d then reopen it.

So I tried flushing instead, but read returns a zero element array.

Currently my script is using mktempdir and just sticking things in a semi-random sub-directory of /tmp. So the text processing that I can do with Julia functions writes to a temp file, the external program reads that, writes to another temp file, and then the second Julia function reads that temp file. I also am currently wrapping the external program in its own function.

There are probably a few different ways to do it. The easiest might be simply to use an intermediate file.

Another method requires formatting your data as a string. Assuming in_data is a string with your data,

io = IOBuffer()
run(pipeline(in_data, `CMD`, stdout = io))
out_data = String(take!(io))  # this is CMD's output

What happens here is that CMD's stdout is redirected to the buffer io. So, all output from the command is captured. Then, you can read it back with take!(). You can see this approach being used in the wild in PkgPage.jl:

Finally, if CMD is a C program under your control, then it may be possible to pass it a pointer to your data, avoiding the need to convert to/from strings.

I hope this helps!

1 Like

Here are some examples you might find useful.

First let’s create a text file to be processed by our pipeline:

write("data.txt", repr("text/plain", rand(Int8, 10, 6)))

This makes a file with lines such as:

3×6 Array{Int8,2}:
  94  -100    3  -27  -97  22
 -10   -18  -10  -43  123  28
  82   -58  -80  -85  -65  57

We will use Julia to skip the header (first line) and calculate the sum of each line of numbers, then the external programs sort and head to sort the lines and keep the first two, then Julia to show the result with line numbers.

Let’s define func1 and func2:

# Calculate sum of numbers in each line of given file (skipping first line), and return the sums as a multi-line string
function func1(file)
  sums = [sum(parse.(Int, split(line))) for line in readlines(file)[2:end]]
  return join(sums, '\n')
end

# Print output of given stream with added line numbers
function func2(io)
  for (i, line) in enumerate(eachline(io))
    println("line $i: $line")
  end
end

# Run the pipeline. The string result from `func1` is wrapped in an IO stream to work as a pipeline source.
pipeline(IOBuffer(func1("data.txt")), `sort -g`, `head -n 2`) |> func2

This can be improved by letting func1 output directly to an IO stream, which it can give back as return value. I also prefer to use an iterator for the file lines rather than read the whole file before processing:

function func1b(file)
  io = PipeBuffer()
  for line in Iterators.drop(eachline(file), 1)
    v = parse.(Int, split(line))
    println(io, sum(v))
  end
  return io
end

Now we don’t need to wrap the output of func1:

pipeline(func1b("data.txt"), `sort -g`, `head -n 2`) |> func2

The syntax is nice, but it might be a poor solution for working with large files: func1/func1b must process the whole file before passing anything to the pipeline.

Normally this would be solved by writing directly to the process, using do to close it automatically when finished:

function func1c(file, io)
    for line in Iterators.drop(eachline(file), 1)
      v = parse.(Int, split(line))
      println(io, sum(v))
    end
end

# Write to standard output (no post-processing in Julia)
open(pipeline(`sort -g`, `head -n 2`), stdout, write=true) do io
  func1c("data.txt", io)
end

However, we want Julia to write to the pipeline and read from it, more or less at the same time. A solution is to run the writer function in a separate task:

# A func1 version that closes the stream when finished, to allow the process to terminate
function func1d(file, io)
  for line in Iterators.drop(eachline(file), 1)
    v = parse.(Int, split(line))
    println(io, sum(v))
    sleep(0.1) # Add artificial delay in generation of input to the pipeline
  end
  close(io)
end

# Start pipeline asynchronously
cmd = open(pipeline(`sort -g`, `head -n 2`), read=true, write=true)

# Start asynchronous writer, passing only the "write end" of the pipe to avoid closing the reader end before the pipeline is finished
writer = @async func1d("data.txt", Base.pipe_writer(cmd))

# Process the pipeline output
func2(cmd)

# Wait for the writer task to finish
wait(writer)

This way the write and read operations will be interleaved as we progress through a large file.

To make this visible though we should change the pipeline commands, because sort consumes the whole input before starting to generate output. Replacing the pipeline with cat shows that we are indeed processing the stream as it is generated:

cmd = open(`cat`, read=true, write=true)
writer = @async func1d("data.txt", Base.pipe_writer(cmd))
func2(cmd)
wait(writer)

Edit: I just found out about Base.BufferStream. This should allow to implement proper stream processing with a nice syntax…

6 Likes

Can you add these examples to the Julia documentation. I think many would benefit from them. Should these work on Windows as well? I tried but couldn’t get even the simplest thing working like run(`dir`).

2 Likes

Wow! I haven’t had a chance to really delve further into this (so haven’t tried any of @mbaz’s suggestions), but that first example looks to be EXACTLY almost exactly what I’m trying for. The missing piece of my puzzle was to wrap the function in an IOBuffer. But then you take it further, because you’re just that awesome. Now THAT is exactly what I was hoping for.

The syntax is nice, but it might be a poor solution for working with large files: func1 / func1b must process the whole file before passing anything to the shell.

This is a concern of mine, but one which my original bash implimentation also has since the external program I’m “stuck with” (since I don’t want to write my own) processes whole files.

I will admit I kind of lost you after that, but on a follow up read when I’m working with the REPL and Vim in front of me I may get it.

Edit: I just found out about Base.BufferStream . This should allow to implement proper stream processing with a nice syntax…

I’m not sure what you’re talking about here. I searched Google for that, but came up empty (save for a BufferedStreams package which seems outdated) as well as the Julia documentation itself. Considering what else you’ve said, and how well you said it, I’m sure your right that this would make for some nice syntax (which is important to me).

Also, I agree with @Tero_Frondelius, I would have loved to have found those examples in the documentation. Personally, I found the documentation on running external files somewhat anemic. in particular, it doesn’t talk about (or doesn’t seem to) my particular problem which I would think would be a common use case (though perhaps not for a language which seems to be known for it’s use in high performance computing).

@Tero_Frondelius I’ll propose a PR to expand the documentation, but the code here is using pipe_writer and (in the following) BufferStream, which are not exported. Maybe this can be discussed as part of the PR.

These examples should also work on Windows if you replace the pipeline commands with something that makes sense on Windows. Note that Julia executes them directly as programs, not as shell commands. And dir is internal to the Windows command line, there is no dir.exe program so it won’t work. Try something like run(`ipconfig`).

2 Likes

@nstgc thanks for the kind words! Indeed BufferStream is unexported and undocumented, I only found it by chance (mentioned in an issue on GitHub). It’s a bit like a PipeBuffer: reading after a write will return what was written. But reading from a BufferStream blocks until data is available, so it can be used to feed data to the pipeline.

Here’s an example using BufferStream:

# Write to io the sum of numbers in each line of the given file (skipping the first line)
function func1e(io, file)
  for line in Iterators.drop(eachline(file), 1)
    v = parse.(Int, split(line))
    println(io, sum(v))
    sleep(0.1) # Add artificial delay in generation of input to the pipeline
  end
end

# Print output of given stream with added line numbers
function func2(io)
  for (i, line) in enumerate(eachline(io))
    println("line $i: $line")
  end
end

# Call f asynchronously with a BufferStream and the given arguments and return
# the BufferStream, which will be closed automatically when f is done.
function async_writer(f, args...)
  io = Base.BufferStream()
  @async try # We don't wait on the task so we need `try` to see the error (if any)
    f(io, args...)
  catch e
    @error e
  finally
    close(io)
  end
  return io
end

pipeline(async_writer(func1e, "data.txt"), `sort -g`, `head -n 2`) |> func2

This looks nice I think, and the async_writer function can be reused to wrap other writer functions.

Again, replace sort -g, head -n 2 with cat to see that the stream is processed as it is produced.

Note the try block, which fixes an issue that was already present in the func1d version (try running the func1d example with a wrong filename: it will hang silently).

I think this new version still has an issue though: as I understand, if func1e writes faster than the pipeline can consume, the internal BufferStream buffer will grow without bound. It would be nice to have it block when reaching a certain size. Maybe this functionality could be added to BufferStream.

1 Like

@sudete

I’m having trouble implementing this.

I have the functions

function hello_io()
       io = PipeBuffer()
       println(io,"Hello")
       return io
end

function myReverse(io)
       reverse(read(io,String))
end

The first half of what I want works great.

julia> run(pipeline(hello_io(),`rev`));
olleH

However, the full thing does not.

julia> run(pipeline(hello_io(),`rev`))|> myReverse
olleH
""

Performing this directly on the REPL works though.

julia> io = PipeBuffer()
       println(io,"Hello")
       reverse(read(io,String))
"\nolleH"

Also, julia> hello_io() |> myReverse works as expected.

I tried looking at the type of output and the run(pileline(stuff)) is a Base.Process. This structure has the field out, so I tried run(pipeline(stuff)).out, but that returns a Base.DevNull(). Since it’s a type of IO I tread reading it, but it’s empty.

The problem is run, it should not be used here: run will send the output to stdout, or to /dev/null if wait=true (unless you play with undocumented arguments). You should give the result of pipeline directly to myReverse.

A pipeline is a Cmd. As the documentation says,

You can use [the Cmd] object to connect the command to others via pipes, run it, and read or write to it.

So the read in myReverse can work directly on the Cmd object.

The pipeline execution will start when the Cmd is opened, and the output can then be read from the Cmd as if it was a file. This is what read does: Doing @less read(`ls`, String) shows that it redirects to the following method:

function read(cmd::AbstractCmd)
    procs = open(cmd, "r", devnull)
    bytes = read(procs.out)
    success(procs) || pipeline_error(procs)
    return bytes
end

Contrary to run, the open call returns a process from which one can read the pipeline output, see @less open(`ls`). That’s why I use open rather than run in some of the examples above. But in simple cases you should just read directly from the Cmd.

3 Likes

Thanks for the really good explanation. Could we add this to the manual as well?

Yes I’ll include something similar when I get to making this PR :slight_smile:

1 Like

Thank you. Everything is working now.

1 Like

Edit: Oops, this was meant for another thread. I was referencing this thread while writing and I guess it posted it here instead.