Suggestions/Improvements: STDOUT consumer task

question

#1

TL;DR: Any suggestions for improvements on the below script sample?

Hi all. I’m somewhat new to Julia and having been working a lot with scripts that need to call other external programs. The easiest way to interface with these programs is by writing to their input pipes and then reading and parsing the text from the output pipe.

I have looked at the “Avoiding Deadlock in Pipelines” section in the docs. While that example makes sense, it’s not exactly a copy/paste working example. This code below is my attempt at a working example of avoiding deadlock in pipelines.

Note that the original example allows for waiting on the external process before reading from the pipeline. In fact, readlines() won’t return with an open Pipe, at least in my limited experience.

My example code below assumes that the process will not exit before we wish to read its output. Something like the Linux less program, where you must enter q or Ctrl+C to exit the program. In this example, I use ping which won’t exit for a long while and has some significant pauses between printing new information.

So, as the TL;DR statement said, I’m asking for feedback on this code. I’m doing what I think is right, but I’m new to Julia so there may be a much better way to do this. I’d love to know!

Thanks!

# test_async_reader.jl

using Base.Process

#####################################################################
# Types
#####################################################################

mutable struct BufferedStdout_t
    process::Base.Process
    stdin::Pipe
    stdout_buff::Array{String}
end

#####################################################################
# Functions
#####################################################################

"""
Get string from line FIFO.

If the FIFO array is empty, returns an empty string.
"""
function pull!(x::Array{String})
    try
        return(shift!(x))
    catch
        return("")
    end
end

function pull!(x::BufferedStdout_t)
    return(pull!(x.stdout_buff))
end

#####################################################################
# Start script
#####################################################################

if is_linux()
    stdout, stdin, proc = readandwrite(`ping example.com`)
elseif is_windows()
    stdout, stdin, proc = readandwrite(`ping -n 10000 example.com`)
else
    error("Don't know what kind of Ping command to use")
end

ext_program = BufferedStdout_t(proc, stdin, [])

# Start new task asyncronously to buffer new strings as they are 
# printed to the process's STDOUT pipe. Use `pull!()` to get data
# out of this buffer in the order it was written.
@async while process_running(ext_program.process)
    push!(ext_program.stdout_buff, readline(stdout))
end

# Get 10 lines from ping print out.
i = 0
while i < 10
    str = pull!(ext_program)

    if (str != "")
        println(str)
        i += 1
    end

    yield() # This yield is required. It allows time for the other
            # (the read and buffer task started earlier) to run

    #sleep(0.01) # This sleep is required. It allows time
                 # for the other task to run
end

# Send a nice SIGINT (Ctrl+C) to the process, to close it
kill(ext_program.process, 2)

println("Done")


#2

My example code below assumes that the process will not exit before we wish to read its output

This is certainly doable, although just waiting for the output to EOF is much simpler and more reliable.

Here’s how I would write this, making use of some more builtin functionality to shorten the code:

julia> ext_program = Channel{String}(32);

julia> proc = open(`ping example.com`);

julia> @async begin
     try
       while !eof(proc)
           put!(ext_program, readline(proc))
       end
    finally
        close(ext_program)
    end
    end

julia> for i = 1:10
           str = take!(ext_program)
           println("got: ", str)
       end;

julia> proc
Process(`ping example.com`, ProcessRunning)

julia> close(proc)

julia> proc
Process(`ping example.com`, ProcessSignaled(13)) # SIGPIPE (aka, reader closed stdout)

Although in this case, the extra Channel is also unnecessary, since the equivalent functionality is already present in readline. So we could also just drop that and rewrite the for-loop as:

for i = 1:10
    str = readline(ext_program)
    println("got: ", str)
end;

Since you’re only using pulling from the process in one direction, there isn’t actually any deadlock concern to need to avoid (other than don’t use wait(proc) or otherwise attempt to wait for the process to exit before starting to consume the data)

For your pull!(x::Array{String}) function, I would code it as:
pull!(x::Array{String}) = isempty(x) ? "" : shift!(x)
This avoids needing to deal with exceptions and potentially having them hide other bugs.


#3

@jameson Wow, thanks! Excellent suggestions.

Apologies for the late reply. I’ll certianly try this out. I appreciate it!