Avoiding `readavailable()` when communicating with long-lived external program

I would like to ask for comments and suggestions on my solution to the following problem. I maintain a frontend to gnuplot, which implements the following plotting process:

  1. Run gnuplot and connect to it via stdin, stdout, and stderr
  2. Send plotting commands to gnuplot by writing to its stdin (gnuplot may take an arbitrary amount of time to run these commands)
  3. Ask gnuplot to print sigil strings to its stdout and stderr
  4. Read gnuplot’s stdout and stderr
  5. Go to step 2.

Julia’s documentation and suggested solutions to inter-process communications focus on a scenario where a process starts, returns something, and finishes, which is different than the case described above.

Essentially, the problem I need to solve is to read() the data available in gnuplot’s stdout and stderr while the pipes are still open and gnuplot is still running. Until now, I’ve been relying on readavailable(), but that approach is no longer recommended. The difficulties I’m facing are the following:

  • read(p::Pipe) blocks until the pipe is closed.
  • read(p, nb, all=false) is not supported when p is a Pipe
  • I could readuntil the sigil string is read, but readuntil(p::Pipe, s) is likewise not supported.

I have come up with a solution based on bytesavailable(), included below. I have one function to start gnuplot, another to end it, and a main function communicate() to perform steps 2–4 above. This code is not super polished yet, but it works in my experiments so far. I’d like to ask for comments and suggestions. Thanks in advance!

"Step 1: Start gnuplot process."
function startgp()
    inp = Pipe()
    out = Pipe()
    err = Pipe()

    process = run(pipeline(`gnuplot`, stdin=inp, stdout=out, stderr=err), wait=false)
    close(out.in)
    close(err.in)

    # Communication fails without these
    Base.start_reading(out.out)
    Base.start_reading(err.out)

    return (process, inp, out, err)
end

"End gnuplot process `p`"
function endgp(p)
    write(p[2], "exit gnuplot\n")
    close(p[2])
    wait(p[1])
    return p[1].exitcode
end

"Send `input` to process `p` and return its response."
function communicate(p, input)
    inp = p[2]
    out = p[3]
    err = p[4]

    gpout = @async begin
        while true
            b = bytesavailable(out)
            if b == 0
                sleep(0.01)
                continue
            else
                return String(read(out, b))
            end
        end
    end

    gperr = @async begin
        while true
            b = bytesavailable(err)
            if b == 0
                sleep(0.01)
                continue
            else
                return String(read(err, b))
            end
        end
    end

    # Step 2: send user input to gnuplot
    write(inp, input)

    # Step 3: ask gnuplot to return sigils when it is done
    write(inp, """\nset print '-'
                   print 'Done'
                   set print
                   print 'Done'\n""")

    # Step 4: read gnuplot's stdout and stderr
    return ( fetch(gpout), fetch(gperr) )

end
2 Likes

I think you can use IOBuffer instead of Pipe for stdin, stdout and stderr - that has read(io, nb; all=false) defined. You can restrict the allowed operations with the read and write keywords on creation.

Since you don’t have bidirectional communication on the same IO object, you don’t need Pipe. That said, Pipe does support regular read(io, nb):

help> Pipe
[...]

 read(s::IO, nb=typemax(Int))                                              
                                                                           
 Read at most nb bytes from s, returning a Vector{UInt8} of the bytes read.

[...]

julia> Pipe <: IO
true

julia> hasmethod(read, (Pipe, Int))
true                              
                                  
julia> applicable(read, Pipe(), 1)
true                              
1 Like

Thank you. I will experiment with an IOBuffer. A quick note on read(io::Pipe, nb): it is supported, but not useful in my case, because read blocks if there are less than nb bytes to read; in other words, the keyword argument all=false is not supported when io is a Pipe.

Replacing Pipe with IOBuffer does not work. The process starts but communicate() blocks.

function startgp()
    inp = IOBuffer(write = true, read = false)
    out = IOBuffer(write = false, read = true)
    err = IOBuffer(write = false, read = true)

    process = run(pipeline(`gnuplot`, stdin=inp, stdout=out, stderr=err), wait=false)

    return (process, inp, out, err)
end

Yeah, that’s because (for some reason - there was some reasoning in some issue on github, though I don’t recall which, sorry) bytesavailable(::IOBuffer) returns 0 - in that case, you can also switch to read(io, nb; all=false) though instead of querying the available bytes, no?

Unfortunately, the external process dies when I use IOBuffers instead of Pipes:

julia> p = startgp()
(Process(`gnuplot`, ProcessRunning),
IOBuffer(data=UInt8[...],readable=false, writable=true,
seekable=true, append=false, size=0, maxsize=Inf, ptr=1,
mark=-1), IOBuffer(data=UInt8[...], readable=true,
writable=false, seekable=true, append=false, size=0, maxsize=Inf, ptr=1, mark=-1), IOBuffer(data=UInt8[...], readable=true, writable=false, seekable=true, append=false, size=0,
maxsize=Inf, ptr=1, mark=-1))

julia> p
(Process(`gnuplot`, ProcessExited(0)),
IOBuffer(data=UInt8[...], readable=false, writable=true,
seekable=true, append=false, size=0, maxsize=Inf, ptr=1,
mark=-1), IOBuffer(data=UInt8[...], readable=true,
writable=false, seekable=true, append=false, size=0,
maxsize=Inf, ptr=1, mark=-1), IOBuffer(data=UInt8[...],
readable=true, writable=false, seekable=true,
append=false, size=0, maxsize=Inf, ptr=1, mark=-1))

Seems it dies because it couldn’t write to the buffer :thinking: Yeah, that’s not good, you’ll have to keep the writing and reading parts of the buffer open (else either gnuplot or you can’t read/write from/to the buffers…). IOBuffer should work though if you prevent writing/reading from your side algorithmically (which should of course be what Pipe already does…)

Would you mind opening an issue for read(::Pipe, nb; all=false)? I don’t think there’s another way to enforce the sort of communication you’re looking for.

For the sake of the discussion, I believe I am working on a somewhat similar problem, see e.g. here for some context.

Basically I implemented a small IPC between a C# program and julia (mostly because embedding the julia library in C# was quite painful). This IPC is performed over named pipes.

My main current problems are

  1. while I think these pipes should be able to support two-way communications, it does not seem to be possible on julia’s side. My current solution is to use one pipe for the input and one for the output.
  2. I should be able to keep the pipe open as long as my C# and julia programs run, but actually I create a pipe using listen once then keep doing connect -- read/write -- close in a while-loop because I can’t figure how to not block the pipe.
    Fortunately, I do not need to be very fast for this application, but it doesn’t feel right to keep opening and closing sockets.

This does not help solving your issue, but at least you know you are not alone on this !

2 Likes

I don’t mind opening an issue, but I’ll wait a few days to see if anybody else chimes in. One question for you, though: you say there’s no other way to solve my problem. However, as far as I can see, the code I posted solves it. Do you see anything wrong with it?

That was in regards to avoiding readavailable and bytesavailable :slight_smile:

In general though, I’m not sure why youre spawning new tasks on each communication attempt at all - I’d continously read the outputs in a long lived background task and parse them into some custom struct, which gets pushed to a Channel for communicating with the main worker thread.

You’re right about spawning new tasks – I will fix that.

Is there a particular reason to avoid bytesavailable? There’s a warning in the manual for readavailable, but not for bytesavailable.

It’s not necessarily a problem since you’re fetching them at the end anyway, but I can imagine a situation where gnuplot just doesn’t send data, at which point you’d be stuck at the blocking fetch with no means to diagnose. Parsing & synchronizing via a Channel would allow you to test this code somewhat interactively at the REPL, since you can just send new commands manually that way.

I think this issue is the most comprehensive summary of gotchas for I/O, I’m not sure OTOH if bytesavailable is tricky or not - I just wouldn’t use it at all because I prefer parsing approaches where one Char/byte is read at a time (and thus blocking is fine). It also leads to busy waiting, since you have to put sleep in there, potentially wasting ressources. This does however depend on how well structured the data you’re receiving is.

This is somewhat academic though, if you don’t plan on using a high performance connection to gnuplot for massive plotting jobs, it will be fine.

I decided to simplify and read one character at a time. I dislike having to append characters to a string one at a time, but I expect to read just a few tens of characters. I wrote a simple version of readuntil for pipes:

"""Read pipe until the string `delim` is read. Return all data read,
with `delim` removed if `keep==false`."""
function readpipeuntil(pipe::Pipe, delim::String; keep::Bool=true)
    isempty(delim) && return ""
    !isreadable(pipe) && return ""
    s = ""
    l = length(delim)
    while true
        s = s*read(pipe, Char)
        if (length(s) >= l) && (s[end-l+1:end] == delim)
            if keep
                return s
            else
                return s[1:end-l]
            end
        end
    end
end

I’m also using two long-lived async tasks instead of creating new ones every time there is something to communicate. These tasks return the data read from the pipes using channels. This code seems to work in my tests; please let me know if you see things that are wrong, or ways to improve it.

"Start gnuplot process and start reading tasks."
function startgp()
    inp = Pipe()
    out = Pipe()
    err = Pipe()

    process = run(pipeline(`gnuplot`, stdin=inp, stdout=out, stderr=err), wait=false)
    close(out.in)
    close(err.in)

    # Communication fails without these
    Base.start_reading(out.out)
    Base.start_reading(err.out)

    chan_gpout = Channel{String}(1)
    task_gpout = @async begin
        while process_running(process)
            put!(chan_gpout, readpipeuntil(out, "GastonDone\n"))
        end
        return ""
    end
    bind(chan_gpout, task_gpout)

    chan_gperr = Channel{String}(1)
    task_gperr = @async begin
        while process_running(process)
            put!(chan_gperr, readpipeuntil(err, "GastonDone\n"))
        end
        return ""
    end
    bind(chan_gperr, task_gperr)

    return (process, inp, out, err, chan_gpout, chan_gperr)
end

"End gnuplot process `p`"
function endgp(p)
    write(p[2], "exit gnuplot\n")
    close(p[2])
    close(p[5])
    close(p[6])
    wait(p[1])
    return p[1].exitcode
end

"Send `input` to process `p` and return its response."
function communicate(p, input)
    inp = p[2]
    out = p[3]
    err = p[4]
    chan_gpout = p[5]
    chan_gperr = p[6]

    # send user input to gnuplot
    write(inp, input)

    # ask gnuplot to return sigils when it is done
    write(inp, """\nset print '-'
                   print 'GastonDone'
                   set print
                   print 'GastonDone'\n""")
    yield()

    return ( take!(chan_gpout), take!(chan_gperr) )
end

Hmmm… Gets a bit more complicated when you need to read in the response from your external program (esp. when it uses stderr to send data). Writing GracePlot.jl was easier because I just sent commands (I don’t even think Grace has read commands to give external users access to internal state).
GitHub - ma-laforge/GracePlot.jl: Build Grace/xmgrace plots with Julia!

Establishing IPC with std pipes

@mbaz: I have been experimenting with Julia v1.6.1 & I’ve been getting a bunch of unexpected behaviours using your methodology. I think you might want to migrate away from this run(pipline()) solution. I found the following to be more robust:

inp = Pipe()
out = Pipe()
err = Pipe()
proc = run(`gnuplot`, inp, out, err, wait=false)

If you launch gnuplot this way, your proc object now links its proc.in, proc.out, and proc.err pipes correctly. It also means you can send out commands with the following:

#I think you need \n-s to get all "pages" of help:
println(proc, "help\n\n\n\n\n")
readavailable(proc.err)

Here, though, I’ve noticed that readavailable() sometimes only reads a single \n character, and reads the whole output on the next call.

Issues using read() instead of readavailable()

If you call read(proc.err), it appears the thread blocks until eof() is reached. This isn’t particularly practical when you are trying to control gnuplot:

val = read(proc.err) #Oops. Will hang until you close your pipe.

#This is better; at least it won't hang if there is data in the pipe:
val = readavailable(proc.err)
#(Unfortunately, it still hangs if the pipe is empty)

But, to be honest, I think you really want to use readline(proc.err). In my experience with this type of communication, programs usually expect the following type of pattern:

println(proc, "Execute_this_command")
answer = readline(proc.err) #Ok. Most apps usually use STDOUT for replies, though.

In other words, most programs send out the reply followed with a newline (unless they are writing back binary data, or something).

What about Base.PipeEndpoint()?

If you run @edit open(`gnuplot`), you’ll notice that open() actually uses Base.PipeEndpoint() objects. Not sure why they exist, or why they aren’t exported, but they have something that Pipe()-s don’t have: IOBuffers!

So, here’s an other avenue to search in:

inp = Base.PipeEndpoint()
out = Base.PipeEndpoint()
err = Base.PipeEndpoint()
proc = run(`gnuplot`, inp, out, err, wait=false)

The interesting thing here is that if your read from the IOBuffers instead of the Pipe/PipeEndpoint objects, val = read(a_buffer) does not block until your pipe closes. It simply reads what is currently in your buffer before returning:

val = read(proc.err) #Oops. Will hang until you close your pipe.
val = read(proc.err.buffer) #This is fine. Execution resumes once all buffer data is read
2 Likes

readuntil(p::Pipe, s) is likewise not supported.

readuntil is currently supported for all IO types, so I’m not sure what you meant here:

julia> @which readuntil(Pipe(), "Done")
readuntil(io::Base.AbstractPipe, arg::AbstractString; kw...) in Base at io.jl:371

using bytesavailable

bytesavailable is mostly just a worse version of readavailable (the similarity in name is intentional)

use IOBuffer

We have an open issue about this: IOBuffer is probably not what you want. You could use Base.BufferStream, but really that will just create a Pipe, and can just use Pipe directly anyways.

use Pipe

Using Base.PipeEndpoint in these examples can be slightly more efficient (and saves you from the requirement to call close(out.in); close(in.out)), but it otherwise equivalent functionally.

using Base.start_reading

Don’t do this, you’ll confuse the internal state, and can just end up with even more problems

RE: readavailable()

I have recently encountered a problem with readavailable() myself. When I used the following, I noticed only a part of my file was read in:

f = open("myimg.png")
data = readavailable(f) #Only reads part of the file?!?
close(f)

On the other hand, if I simply used read(), the whole image was read in from disk:

f = open("myimg.png")
data = read(f) #This time, I get the whole file!
close(f)

So, maybe this is why there is a warning in the julia documentation:

You only get whatever your OS chooses to read (cache) into your buffer.

Possible limitation wrt Pipes?

I’m not 100% sure this statement applies to Pipe-s, though. They might react differently to the readavailable() command than IOBuffer-s do. In fact, I have observed they react differently in certain ways wrt their blocking behaviour - as mentioned in my previous post.

@async solution

@mbaz: Though I haven’t gone through your entire solution, I’m a bit skeptical about the use of @async in this situation.

As far as I can tell, the only reason you should have another thread listening to the output of gnuplot is if you want your Julia program to respond to unexpected state changes or signals coming from gnuplot (ex: user inputs).

For example, if gnuplot was able to send “click events” when the user clicked on a data point, then you might want a separate thread listening for these signals and triggering actions in your Gaston.jl package.

Typical handshaking protocol

Otherwise, your code would probably be easier to manage if you followed a typical handshaking protocol (Send message/command; Receive ACK/Synchronize):

println(proc, "Execute_this_command")

#readline() should block until gnuplot responds:
answer = readline(proc.err) #Ok: Most apps usually use STDOUT for replies, though.
#Process answer or verify "Ack" here

(Adapted from my previous post)

The subtlety that bytesavailable for files can return 0 because of OS level buffers is unfortunate, maybe this should be renamed to bytesbuffered and readbuffered in the future to make the intent more clear? I think there was a similar proposal in the issue I linked above.

@MA_Laforge Thank you for your comments; you’ve given me much material to chew on. In particular, using readline sounds very promising.

To elaborate on the “protocol” between Gaston and gnuplot: gnuplot does not have a “request-response” type of protocol built in. Gaston needs to know two things: (1) when gnpulot is done plotting, and (2) when it has read the whole of stderr (gnuplot may output any number of characters (including 0) when it detects an error). To do this, Gaston asks gnuplot to return a “sigil” string “GastonDone” both on stdout and stderr.

1 Like

If bytesavailable is even worse, may I suggest also adding a warning to the documentation?

I must have made some mistake: I tried it and got an ArgumentError. I’ll try again! I will also try with Base.PipeEndpoint.

Thanks for all the pointers!