Running commands in sequence in a pipeline

I am writing a script which—at least in BASH—best works with subshells. The basic structure is “(do something; do something else; etc.) | the next thing|…”. A minimal working example (in BASH) would be:

(echo -n "Hello "; echo "world") > text.file

With Julia, the semicolon is a special character and must be escaped in a Cmd where it doesn’t perform as it would in bash so

pipeline(`echo -n "Hello "; echo "world"`,"text.file");

doesn’t work. My next thought was something like is shown in Pipelines in the manual.

run(pipeline(`echo -n "hello "` & `echo  world`, "out.txt"));

The problem there is that on closer inspection to the text, the output is nondeterministic, which won’t work for me. I could not, however, get "worldhello "to print so I’m not sure if this is just an issue with the documentation.

I then just tried experimenting. Namely, I tried replacing the ampersand with a comma or semicolon. That just threw errors, which was what I expected. Next I tried making an array of commands.

cmdarray = [`echo -n "hello "`  `echo  world`]

run.(cmdarray) seems to work, but when I try run.(pipeline.(cmdarray,"out.txt")) I end up with just “world” in the text file. Without the vectorizing .'s, it fails with method errors, which is what I expected.

So I’m a bit lost as to what I can do.

edit: The example here may be a bit overly simple since you could print to the file and then print to it again, amending it. In my case, the data stream needs to sort of ship together or it’s a bit of a mess.

edit2: Another solution I thought up before bed is to generate the sequence as it would be used in bash and then use that.

function myStream(parameters)
    sequence = Some stuff generating a Cmd that leads to
    return `bash -c '$sequence'`
end

run(pipeline(myStream(parameters),`compressor`))

However, using bash -c 'script' seems a bit janky and brute-force-ish. Another related option would be to generate the $sequence by a Julia script that runs in a bash script, but I’d rather avoid two scripts.

edit3: Something else that popped into my head which I’ll try later today is to open the thing I’m passing the stream to, write it sequentially, then close it.

compressor = open(`the compressor I'm streaming to`,'a+')
write(compressor,read(`one part of the sequential stream`))
write(compressor,read(`another part of the sequential stream`))
close(compressor)

I know with 'a+' I’m appending, but I’m doing it in a way that doesn’t close the stream so maybe that would work? I’m not sure. I’ll try this afternoon as well.

You were almost there, you just need to tell pipeline to append to the standard output:

run.(pipeline.([`echo 1`, `echo 2`, `echo 3`], stdout="output.txt", append=true))
1 Like

Thanks, that makes sense for generating the result in the example given. I’ll try it later today, and though I’m sure it will work for the toy problem, as stated in the edit, I think my “minimal working example” was an overly minimal example (sorry). As mentioned, amending is unlikely to work. I can’t remember exactly what happened in Bash when I tried that, but it didn’t work. Also, that seems to be equivalent to

echo -n Hello > out.txt ; echo world >> out.txt

rather than (echo -n "Hello "; echo "world") > text.file. Note also that it generates multiple processes, which as you’ll in a bit is a problem for my real problem.

I am sending this data to a compressor. That is the first problem and why the other side of the pipe needs to run as a single process (at least as far as I can tell). The data is generated by a sequence of external programs. It’s this sequence of programs that makes it problematic for Bash since the sequence itself needs to be generated which at least in my mind calls for functional programming.

Another solution I’ll try later today (which I thought of as I brushed my teeth before bed last night which says something of the quality of the thought) is to generate the sequence as it would be used in bash and then use that.

function myStream(parameters)
    sequence = Some stuff generating a Cmd that leads to
    return `bash -c '$sequence'`
end

run(pipeline(myStream(parameters),`compressor`))

However, using bash -c 'script' seems a bit janky and brute-force-ish. Another related option would be to generate the $sequence by a Julia script that runs in a bash script, but I’d rather avoid two scripts.

edit: Something else that popped into my head which I’ll try later today is to open the thing I’m passing the stream to, write it sequentially, then close it.

function myStream(parameters)
    return [`some`, `generated`, `array`, `of`,`Cmd's`]
end

compressor = open(`the compressor I'm streaming to`,'a+')
for Cmd in myStream(args)
    write(compressor,read(Cmd))
end
close(compressor)

I know with 'a+' I’m appending, but I’m doing it in a way that doesn’t close the stream so maybe that would work? I’m not sure. I’ll try this afternoon as well.

Your last idea is in the right direction I think. What you get from open is a stream, and you can incrementally write to it so appending is already implied. The append specification in open is something else: it says to keep what the file contains before the whole thing starts.

Maybe you want something like this:

open(pipeline(`gzip`, "output.gz"), write=true) do f
    run(pipeline(`ls`, f))     # Execute a program writing to f
    run(pipeline(`ls /`, f))   # Run another program, also writing to f
end
3 Likes

Yes, that looks right! I’ll have to try it, which again won’t be until this afternoon, but that looks like it should work. I recently changed my latest suggestion to include looping over an array of Cmd’s, so I think I’d put that loop inside the do block. It would certainly be good for me to make use of a do block since that is one thing which seems extremely powerful and useful, yet for some reason continues to throw me.

Sorry again for the bad example, and thanks also for confirming the way append works in this usage.

OK let us know how it goes!

The do block works because there is an open method that accepts a function as first argument. The code above is equivalent to:

function write_to_stream(f)
    run(pipeline(`ls`, f))
    run(pipeline(`ls /`, f))
end

open(write_to_stream, pipeline(`gzip`, "output.gz"), write=true)

open will start the pipeline, then call write_to_stream with the pipeline stream as parameter, and close the pipeline when write_to_stream is finished. And as a bonus, this version of open has a try/catch to close the pipeline even if write_to_stream throws an exception.

The do block is just a fancy way to define this write_to_stream and pass it to open as first argument.

2 Likes

The first thing I tried was a modification of what I suggested since it’s what I understand best.

stream_source = [
    `explicitly`,`given`,`commands`,
    `I actually care about`
]  #i.e not echo'ing "Hello world"

zstd = open(pipeline(`zstd`,"out.zst"),"w+")
for stream in stream_source
    run(pipeline(stream,zstd))
end
close(zstd)

That worked. For those who might read this later (very possible people with user names starting with “ns” and end in “c”), this differs from what I had proposed early in that rather than using write, I’m streaming to the open process as seen in @sijo’s suggestion. I’m not sure if my original idea would work, but I doubt it, and I don’t feel like testing it at this moment. Also, the open process is a pipeline streaming to an output text file.

This looks very nice. It isn’t as concise as what I might be able to come up with in Bash, but it feels better. I’m not at my most articulate right now, but I’m happy with this. I’m certain whatever I would need to concoct to generate the array would be janky as hell, and probably reqyire I hardcode a lot of stuff which shouldn’t be hardcoded.

One thing that does irk me, and gives me pause, is that I can’t pipe to wc -c (something I could do by sending the script’s output to stdout). I am personally interested in knowing how large the stream is since the next stop is over a network. This isn’t a big deal, but I am concerned should I try lrz -n | xz -9 --extreme > final.output that it won’t be pased on from lrz to xz properly. That isn’t a major concern since I am leaning towards zstd which is almost as good as that, but substantially faster. Moreover, I’m concerned that lrz might be abandoned sometime in the next 10 years or so, assuming the 4 years of inactivity aren’t indicative that the project will turn to dust soon. zstd is supported by facebook and used by Arch Linux so I’m not worried about it disappearing from my repo anytime in the near or not-so-near future.

Other than that, outside this initial period where I’m setting things up and need to confirm the stream is doing what it should be doing, I almost certainly can end it with the compressor streaming to a file.

So that’s good as is.

stream_source = [
    `explicitly`,`given`,`commands`,
    `I actually care about`
]

open(pipeline(`zstd`,"out.zst"),write=true) do zstd
    for stream in stream_source
        run(pipeline(stream,zstd))
    end
end

Unsurprisingly that works, however the “this is equivalent to” example really caught my eye.

function write_to_compressor(compressor) 
    stream_source = [
        `explicitly`,`given`,`commands`,
        `I actually care about`
    ]
    for streams in stream_source
        run(pipeline(stream,compressor))
    end
end

open(write_to_compressor, pipeline(`zstd`, "out.zst"), write=true)

Something about the way we have a single line at the end that takes predefined stuff and then does what I need it to do looks nicer to me. However I don’t especially care for putting the Cmd array in the function. I could define two write_to_compressor functions, one which takes the array as an argument and another

write_to_compressor(compressor) = write_to_compressor(cmd_array,compressor)

which wraps it so open can use it, but I’d rather not. More likely, instead of writing a cmd_array, I would write a function that also contains that for loop. That may or may not call for a wrapper. I haven’t actually written the function that produces the array yet. I wanted to start with the end of the process first so I knew what I needed to feed the final few lines. Also, I foresaw this as the largest hurdle.

Another down side to that last one is that it is longer compared to the other two… by a line.

edit: On closer inspection, I’m not sure this is working as desired since the md5sum’s for the script versus by hand aren’t identical. I’m not sure what’s going on, but that worries me. I’m using the do block version by the way.
I forgot that the script used different names for things to aid automation that the hand written command line input lack.

1 Like

The standalone open(write_to_compressor, ...) makes it very readable indeed :slight_smile:

I’m not sure what you mean with wc -c. Can’t you do something like this:

julia> write_to_compressor(compressor) = run(pipeline(`ls`, compressor));

julia> open(write_to_compressor, pipeline(`wc -c`, stdout), write=true);
550

or shorter (this works because stdout is a stream):

julia> open(write_to_compressor, `wc -c`, stdout, write=true);
550

Or do you want to “measure” the stream at the same time as you send it to the compressor?

1 Like

I accidentally posted this in the wrong thread!

I’m not sure what I was doing wrong before, but it’s working now.

open(pipeline(`zstd`,`wc -c`, stdout),write=true) do compress
    for stream in cmd_array
        run(pipeline(stream,compress))
    end
end

That works. As for your question, something like tee would be nice, but not necessary. To me, the obvious solutions are to either specify stdout= twice in a pipeline or simply call tee. The first doesn’t work, and to my surprise

open(pipeline(`zstd`,`tee out.zst`, `wc -c`),write=true) do compress

doesn’t either. Oh, I see. I specifically need to add stdout at the end.

For future reference, is there a tee equivalent? In the case of tee you can effectively pipe to several files, including processes. The closest thing in Julia I can think of is something like

run(pipeline(`perl -le '$|=1; for(0..5){ print; sleep 1 }'`, prefixer("A",2) & prefixer("B",2)));

as taken from Running External Programs: Complex Example in the manual. The equivilent in this case seems to be

open(pipeline(`zstd`,pipeline(`wc -c`,stdout) & pipeline(`cat`,"out.zst")),write=true) do compress

with

pipeline(`cat`,"out.zst")

being required instead of just "out.zst" due to the & operator in this context not working on strings. Also I couldn’t stdout="out.zst" for reasons explained in an error message I admitted didn’t read, but it has something to do with invalid keywords. Regardless, I have something that works reasonably well, and I can do

open(pipeline(pipeline(`wc -c`,stdout) & pipeline(`zstd`,pipeline(`wc -c`,stdout) & pipeline(`cat`,"out.zst"))),write=true) do compress

to print both the compressed and uncompressed size of the stream. Or, if I want that in an easier to read format:

cmd_array = function_generating_commands(input)
wc = PipeBuffer()
str = ["un",""].*"compressed".*": "
function suffix(size::Int)
	digits = floor(Int,log(size)/log(10))
	if digits < 3
		return ("B",0)
	elseif digits < 6
		return ("kB",3)
	elseif digits <9
		return ("MB",6)
	else
		return ("GB",9)
	end
end


open(pipeline(
    pipeline(`wc -c`,wc) &
    pipeline(`zstd`,
        pipeline(`wc -c`,wc) &
        pipeline(`cat`,"out.zst")
    )
),write=true) do compress
    for stream in cmd_array
        run(pipeline(stream,compress))
    end
end
for (index,line) in enumerate(eachline(wc))
    size = parse(Int,line)
    println(
        str[index],
        round(size/10^suffix(size)[2],sigdigits=3),
        suffix(size)[1]
)
end

My only complaint with that is having to cat "out.zst" feels kind of janky.

I could also probably replace pipeline(wc -c,wc) with a variable (word_count) to improve readability.

edit: Oh dear, something is going terrible wrong. When I actually run that, I get different sizes for different runs so each of those pieces in the open are eating it differently I guess? I assumed each of those things seperated by & was getting the full stream.

Ah that’s not going to work: pipeline(cmd1 & cmd) starts two commands in parallel. They will both consume from the same standard input, so each command only gets part of it. It would be the same in Bash by the way: data is not duplicated automatically.

To duplicate data for the two consumers (wc and the compressor) you could use the tee command. This command writes a duplicate to a file, which you can then access in Julia. But a quick hack is to use the special file /dev/stderr, which is easy to forward to another command in pipeline!

# A command that measures data running through it and writes the result to the `wc` stream
tee_wc = pipeline(`tee /dev/stderr`, stderr=pipeline(`wc -c`, wc))

# The main command
cmd = pipeline(tee_wc, `zstd`, tee_wc, "out.zst")

# Run main command and feed it the outputs of commands in `cmd_array`
open(cmd, write=true) do compress
  for stream in cmd_array
    run(pipeline(stream, compress))
  end
end

Note that there’s probably a race condition: I don’t think the order of writes from the two tee_wc commands to the wc stream is well defined. For the second measurement, I would simply use stat("out.zst").

Also, rather than tee I would prefer to use a dummy IO object in Julia, wrapping the compress stream and measuring how much data goes through. But it seems surprisingly tricky to implement…

1 Like

By “dummy IO object” do you mean something like what I did with wc = PipeBuffer()? Or maybe something like

function tee(stream::IO)
    stream_copy = copy(stream)
    return (stream,stream_copy)
end

? I’m pretty sure that wouldn’t work with a pipeline, but something along that general idea?

As for using stderr, the external programs I am running make use of that already. Perhaps instead a temp_pipe = Pipe()?

Alternatively, looking at pipeline in cmd.jl with @less, it seems to make use of CmdRedirect which is a structure. So it seems like a Tee function would require it’s own structure, one with more than one IOs? I have no idea. This is well beyond my capacity.

Anyway, this goes well beyond the original question. I should probably make a new thread for this, something I’ll likely do later. As of right now, as interesting as this would be, it isn’t necessary for me.

Thank you again.

Each process has its own stderr so I think the tee_wc hack should work (unless something goes wrong and the tee command itself wants to write a message to its standard error, but even that could be fixed with tee options I think).

For the dummy IO object, maybe making a CmdRedirect would work! Though I was hoping for a more general solution, not limited to commands. I meant a custom type such as

struct WriteCount <: IO
  io::IO 
  counter::Int
end

function Base.write(w::WriteCount, x)
  n = write(w.io, x)
  w.counter += n
  return n
end

The WriteCount wrapper would simply delegate the writes to the underlying w.io (the compress stream in our case). This would be simpler to use than a PipeBuffer, and wouldn’t have the overhead of the intermediate buffer. But I couldn’t figure out what methods I must implement on WriteCount to make it work :frowning:

1 Like

I tried your tee_wc solution and that worked well. Thanks. This is nice info to have.

As for the rest… I’ll wait for the weekend or something to look at it. Thank you for cobbling that together, but it will take a lot more time for me to really get what’s going on then I have right now. I’m not a programmer, just someone who needs programs other people aren’t writing.

I’ve continued tinkering with the script I wrote, but am now having a new, related issue.

I made the realization that there is surely some means for Julia to keep track of much data is in an IO buffer and indeed there is. For my purposes it’s nicer to work with PipeBuffers. So instead of using wc -c and tee, I can just check stream.size. Except this isn’t working quite as expected.

function compress_streams(streams::Array)
.
.
.
compressor = open(pipeline(
    adaptive_compressor(uncompressed_size);stdout=compressed
),"w+")
    write(compressor,streams...)
close(compressor)
.
.
.
end

where adaptive_compressor returns a Cmd, and streams is an Array{PipeBuffer,1}. This works… sort of. The final output file is good, but when I run println(compressed.size) I get 0. Using the REPL to dissect what’s going on and setting compressed = compress_stream(streams) and then just waiting a while before checking the size, I get a reasonable size. It isn’t exactly the same as the output file, but that’s due to passing the compressed stream onto GPG before finally writing it. Inserting a sleep command in there “fixes” this.

What doesn’t work due to errors, is a do block.

open(pipeline(
    adaptive_compressor(uncompressed_size);stdout=compressed
),write=true) do compress
	for stream in streams
		run(pipeline(stream,compress))
	end
end

That throws a methods error. As does when I instead use run(pipeline(stdin=stream,compress)), run(pipeline(compress;stdin=stream)) and similar rearrangements.

An example error message is:

ERROR: MethodError: no method matching pipeline(::Base.Process; stdin=IOBuffer(data=UInt8[…], readable=true, writable=true, seekable=false, append=true, size=0, maxsize=Inf, ptr=1, mark=-1))
Closest candidates are:
pipeline(::Union{RawFD, Base.FileRedirect, AbstractString, IO}, ::Base.AbstractCmd) at cmd.jl:294 got unsupported keyword argument “stdin”
pipeline(::Any, ::Any, ::Any, ::Any…) at cmd.jl:314 got unsupported keyword argument “stdin”
pipeline(::Base.AbstractCmd; stdin, stdout, stderr, append) at cmd.jl:277

Stacktrace:
[1] (::var"#29#32"{Array{Base.GenericIOBuffer{Array{UInt8,1}},1}})(::Base.Process) at ./REPL[19]:21
[2] open(::var"#29#32"{Array{Base.GenericIOBuffer{Array{UInt8,1}},1}}, ::Base.CmdRedirect; kwargs::Base.Iterators.Pairs{Symbol,Bool,Tuple{Symbol},NamedTuple{(:write,),Tuple{Bool}}}) at ./process.jl:393
[3] compress_stream(::Array{Base.GenericIOBuffer{Array{UInt8,1}},1}) at ./REPL[19]:19
[4] top-level scope at REPL[20]:1
[5] run_repl(::REPL.AbstractREPL, ::Any) at /build/julia/src/julia-1.5.3/usr/share/julia/stdlib/v1.5/REPL/src/REPL.jl:288

edit: Something which also works is:

run(pipeline(`zstd`;stdin=stream[1],stdout=compressed))

but

open(
    adaptive_compressor(uncompressed_size),
write=true) do compress
    for stream in streams
        run(pipeline(
            compress;stdin=stream,stdout=compressed
        ))
    end
end

does not, again failing with MethodError.

ERROR: MethodError: no method matching pipeline(::Base.Process; stdin=IOBuffer(data=UInt8[…], readable=true, writable=true, seekable=false, append=true, size=0, maxsize=Inf, ptr=1, mark=-1), stdout=IOBuffer(data=UInt8[…], readable=true, writable=true, seekable=false, append=true, size=0, maxsize=Inf, ptr=1, mark=-1))
Closest candidates are:
pipeline(::Union{RawFD, Base.FileRedirect, AbstractString, IO}, ::Base.AbstractCmd) at cmd.jl:294 got unsupported keyword arguments “stdin”, “stdout”
pipeline(::Any, ::Any, ::Any, ::Any…) at cmd.jl:314 got unsupported keyword arguments “stdin”, “stdout”
pipeline(::Base.AbstractCmd; stdin, stdout, stderr, append) at cmd.jl:277

Stacktrace:
[1] (::var"#34#37"{Array{Base.GenericIOBuffer{Array{UInt8,1}},1},Base.GenericIOBuffer{Array{UInt8,1}}})(::Base.Process) at ./REPL[29]:21
[2] open(::var"#34#37"{Array{Base.GenericIOBuffer{Array{UInt8,1}},1},Base.GenericIOBuffer{Array{UInt8,1}}}, ::Cmd; kwargs::Base.Iterators.Pairs{Symbol,Bool,Tuple{Symbol},NamedTuple{(:write,),Tuple{Bool}}}) at ./process.jl:393
[3] compress_stream(::Array{Base.GenericIOBuffer{Array{UInt8,1}},1}) at ./REPL[29]:19
[4] top-level scope at REPL[30]:1
[5] run_repl(::REPL.AbstractREPL, ::Any) at /build/julia/src/julia-1.5.3/usr/share/julia/stdlib/v1.5/REPL/src/REPL.jl:288

edit2: Another, simpler example:

open(`zstd`,write=true) do compress
    for stream in streams
        run(pipeline(
            compress;stdin=stream,stdout=compressed
        ))
    end
end

fails whereas,

open(`doesn't matter`,write=true) do compress
    for stream in streams
        run(pipeline(
            `zstd`;stdin=stream,stdout=compressed
        ))
    end
end

works.

In the first example you have

compressor = open(cmd, "w+")
write(compressor, streams...)
close(compressor)

That makes sense: open a process, write to it, close it. Although it’s not clear what happens with the close (see below).

In the other examples, you have something like

open(cmd, write=true) do compress
  run(compress, ...)
end

I don’t get why a second process is being started here with run.

Back to the first example. It’s not clear what this close call does… The documentation for open(command, ...) says nothing about close. Actually there isn’t a close method specific to the Process type, it’s calling close of the parent type AbstractPipe.

However the documentation of open(f, command, ...) says:

Similar to open(command, args...; kwargs...), but calls f(stream) on the resulting process stream, then closes the input stream and waits for the process to complete.

Sounds like we should do exactly that! But rather than reimplementing it ourselves with open(command, ...), we can use this open(f, command, ...) method:

adaptive_compressor() = pipeline(`gzip`)

function compress_streams1(streams)
    compressed = PipeBuffer()
    cmd = pipeline(adaptive_compressor(), stdout=compressed)

    # A function that takes a `p` parameter and writes `streams` to it 
    do_write = p -> write(p, streams...)

    # This version of open returns the value returned by do_write (the number of bytes written)
    n = open(do_write, cmd, write=true)

    println("Wrote $n bytes, got $(compressed.size) bytes")
end

Note: this is exactly what do blocks are for: you give a method that takes a function as first argument, here open(do_write, ...)), and it calls it with the do body as function (instead of do_write). So we could rewrite it as:

function compress_streams2(streams)
    compressed = PipeBuffer()
    cmd = pipeline(adaptive_compressor(), stdout=compressed)

    # This is calling the same `open` method as before but with `do` syntax.
    n = open(cmd, write=true) do p
        write(p, streams...)
    end

    println("Wrote $n bytes, got $(compressed.size) bytes")
end

Both work the same:

julia> compress_streams1(repeat("hello", 1000))
Wrote 5000 bytes, got 49 bytes

julia> compress_streams2(repeat("hello", 1000))
Wrote 5000 bytes, got 49 bytes

Note that with this method you have to hold the whole stream in memory, just to measure it. And I think it’s a risky use of PipeBuffer: the size field is not documented, and it only stores the written size until you start consuming the data.