HTTP.jl IOError: stream is closed or unusable

Hi all-

I am trying to communicate with a Julia websocket server via Java and I encounter an error stating:

IOError: stream is closed or unusable

The server receives the message but Java never receives the response, presumably because the connection is lost. Here is the code

#Julia

using JSON, HTTP

       @async HTTP.WebSockets.listen("127.0.0.1", UInt16(8081)) do ws
                  while !eof(ws)
                      recieved_message = readavailable(ws)
                      isempty(recieved_message) ? break : nothing
                      data = JSON.parse(String(recieved_message))
                      println(data)
                      send_message = "hello"
                      jmessage = JSON.json(send_message)
                      write(ws, jmessage)
                  end
              end

#Java

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;

import com.google.gson.Gson;

@ClientEndpoint 
public class Client {
	
	private URI uri;
	private Session session = null;
	
	
	public Client(String address) throws URISyntaxException, DeploymentException, IOException {
		uri = new URI(address);
		ContainerProvider.getWebSocketContainer().connectToServer(this, uri);
	}
	
	@OnOpen
	public void processOpen(Session session) {
		this.session = session;
	}
	
	@OnMessage
	public void processMessage(String message) {
		System.out.println(message);
	}
	
	public void sendMessage(String message) throws IOException {
		Gson gson = new Gson();
		String text = gson.toJson(message);
		session.getBasicRemote().sendText(text);
	}
	
}

#Java

import java.io.IOException;
import java.net.URISyntaxException;

import javax.websocket.DeploymentException;

public class ClientTest {

	public static void main(String[] args) throws URISyntaxException, DeploymentException, IOException {
	    Client client = new Client("ws://127.0.0.1:8081");
	    client.sendMessage("dfkdlfjdljfd");
	}

}

Here is the full error message:

dfkdlfjdljfd
┌ Error: error handling request
│   exception =
│    IOError: stream is closed or unusable
│    Stacktrace:
│     [1] check_open at ./stream.jl:323 [inlined]
│     [2] uv_write_async(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64, ::Task) at ./stream.jl:901
│     [3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:877
│     [4] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:931
│     [5] unsafe_write at /home/dfish/.julia/packages/HTTP/lZVI1/src/ConnectionPool.jl:134 [inlined]
│     [6] unsafe_write at ./io.jl:522 [inlined]
│     [7] macro expansion at ./gcutils.jl:87 [inlined]
│     [8] write at ./io.jl:545 [inlined]
│     [9] #closewrite#9(::Nothing, ::typeof(closewrite), ::HTTP.WebSockets.WebSocket{HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:174
│     [10] #closewrite at ./none:0 [inlined]
│     [11] #close#10(::Nothing, ::typeof(close), ::HTTP.WebSockets.WebSocket{HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:221
│     [12] close at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:220 [inlined]
│     [13] #upgrade#8(::Bool, ::typeof(HTTP.WebSockets.upgrade), ::getfield(Main, Symbol("##12#14")), ::HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:148
│     [14] #upgrade at ./tuple.jl:0 [inlined]
│     [15] #6 at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:123 [inlined]
│     [16] macro expansion at /home/dfish/.julia/packages/HTTP/lZVI1/src/Servers.jl:360 [inlined]
│     [17] (::getfield(HTTP.Servers, Symbol("##13#14")){getfield(HTTP.WebSockets, Symbol("##6#7")){Bool,getfield(Main, Symbol("##12#14"))},HTTP.ConnectionPool.Transaction{Sockets.TCPSocket},HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}})() at ./task.jl:268
└ @ HTTP.Servers ~/.julia/packages/HTTP/lZVI1/src/Servers.jl:364

Version Info:

Julia 1.2.0

(v1.2) pkg> st HTTP
    Status `~/.julia/environments/v1.2/Project.toml`
  [cd3eb016] HTTP v0.8.8

I suspect it’s because the Java application terminates after sending the message. Maybe if you add a sleep for 1 or 2 seconds before main() exits will give Julia time to send a response?

Thank you for your idea. I gave it a try, but unfortunately it did not work. I also adjusted setAsyncSendTimeout for the websocketcontainer object with no success.

It does work with JavaScript. So I’m not sure whether HTTP does not interact well with Java. As far as I can tell, the Java code is correct.

I did notice that adding sleep allowed Java to receive “hello” message. The error remained for the Julia server unfortunately. So it did correct part of the problem.

I suspect the isempty() is not working as you expect. Julia would be “paused” there until the connection terminates, and your stack says the issue is in the write. So maybe instead of just isempty() use isempty(received_message) || eof(ws) that will break out if the message is empty or the connection has closed.

Good idea. I added the line isempty(recieved_message) ? break : nothing to prevent JSON.json from crashing. I removed that line and the dependency on JSON entirely. Unfortunately, the problem remains.

In fact, the issue remains if I remove the line write(ws, send_message) from the websocket server. So it does not seem to be entirely related to write. If I remove the read statement like so:

using JSON, HTTP

       @async HTTP.WebSockets.listen("127.0.0.1", UInt16(8081)) do ws
                  while !eof(ws)
                      send_message = "hello"
                      write(ws, send_message)
                  end
              end

I receive a different error message:

┌ Error: error handling request
│   exception =
│    IOError: write: broken pipe (EPIPE)
│    Stacktrace:
│     [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./task.jl:517
│     [2] wait() at ./task.jl:592
│     [3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:883
│     [4] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:931
│     [5] unsafe_write at /home/dfish/.julia/packages/HTTP/lZVI1/src/ConnectionPool.jl:134 [inlined]
│     [6] unsafe_write at ./io.jl:522 [inlined]
│     [7] macro expansion at ./gcutils.jl:87 [inlined]
│     [8] write at ./io.jl:545 [inlined]
│     [9] #closewrite#9(::Nothing, ::typeof(closewrite), ::HTTP.WebSockets.WebSocket{HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:174
│     [10] #closewrite at ./none:0 [inlined]
│     [11] #close#10(::Nothing, ::typeof(close), ::HTTP.WebSockets.WebSocket{HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:221
│     [12] close at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:220 [inlined]
│     [13] #upgrade#8(::Bool, ::typeof(HTTP.WebSockets.upgrade), ::getfield(Main, Symbol("##12#14")), ::HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:148
│     [14] #upgrade at ./tuple.jl:0 [inlined]
│     [15] #6 at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:123 [inlined]
│     [16] macro expansion at /home/dfish/.julia/packages/HTTP/lZVI1/src/Servers.jl:360 [inlined]
│     [17] (::getfield(HTTP.Servers, Symbol("##13#14")){getfield(HTTP.WebSockets, Symbol("##6#7")){Bool,getfield(Main, Symbol("##12#14"))},HTTP.ConnectionPool.Transaction{Sockets.TCPSocket},HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}})() at ./task.jl:268
└ @ HTTP.Servers ~/.julia/packages/HTTP/lZVI1/src/Servers.jl:364

The pattern of errors is rather strange to me, especially the fact that it works with JavaScript.

Well now you are getting IOError: write: broken pipe (EPIPE) instead of IOError: stream is closed or unusable. In your java application try closing the stream before exiting so that the connection is torn down cleanly.

Sorry. I forgot to note that I added a close method:

    @OnClose
    public void onClose() throws IOException {
        System.out.println("closing websocket");
        this.session.close();
    }

I call that function after sending the message and still get the error.

I would add the read back…your current code is basically writing to the stream as fast as it can and the JAVA code would be closing the connection during a write.

Ok. I tried that and I got the following error:

Error: error handling request
│   exception =
│    InexactError: check_top_bit(UInt16, 52514)
│    Stacktrace:
│     [1] throw_inexacterror(::Symbol, ::Type{UInt16}, ::UInt16) at ./boot.jl:560
│     [2] check_top_bit at ./boot.jl:574 [inlined]
│     [3] toInt16 at ./boot.jl:611 [inlined]
│     [4] Type at ./boot.jl:708 [inlined]
│     [5] convert at ./number.jl:7 [inlined]
│     [6] Type at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:24 [inlined]
│     [7] readframe(::HTTP.WebSockets.WebSocket{HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:272
│     [8] readavailable(::HTTP.WebSockets.WebSocket{HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:239
│     [9] (::getfield(Main, Symbol("##10#12")))(::HTTP.WebSockets.WebSocket{HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at ./untitled-cea905cbd4280040b3ff1dcb3b8cbbd4:5
│     [10] #upgrade#8(::Bool, ::typeof(HTTP.WebSockets.upgrade), ::getfield(Main, Symbol("##10#12")), ::HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:146
│     [11] #upgrade at ./tuple.jl:0 [inlined]
│     [12] #6 at /home/dfish/.julia/packages/HTTP/lZVI1/src/WebSockets.jl:123 [inlined]
│     [13] macro expansion at /home/dfish/.julia/packages/HTTP/lZVI1/src/Servers.jl:360 [inlined]
│     [14] (::getfield(HTTP.Servers, Symbol("##13#14")){getfield(HTTP.WebSockets, Symbol("##6#7")){Bool,getfield(Main, Symbol("##10#12"))},HTTP.ConnectionPool.Transaction{Sockets.TCPSocket},HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}})() at ./task.jl:268
└ @ HTTP.Servers ~/.julia/packages/HTTP/lZVI1/src/Servers.jl:364

I also called sleep in the last line of the server code, which caused the original IOError: stream is closed or unusable. Very strange!

Oh that is a weird error. At this point we’ve hit the limit of what I can help with. At it’s most basic a WebSocket is just a TCP/IP connection which is very well known/defined…why Java is causing an issue closing the connection I don’t know.

I appreciate your help! It seems that this is a problem with HTTP.jl that manifests in various forms. What I am experiencing seems to be slightly different from what is described in that issue, but it still might be related. I’m not sure. @pfitzseb, I hope you don’t mind me pinging you to see if you have any guidance?

Not really, no.
You could give WebSockets.jl a go (which itself uses HTTP.jl, so it’s not super likely to help).

Can you show what code you’re currently working with? The IOErrors seem to indicate that the Java process dies before Julia has completly written a response. I don’t know anything about Java though, so ¯\_(ツ)_/¯.

Yeah. I also considered WebSockets.jl until I saw that it uses HTTP.jl.

Thanks for your willingness to look over the code even if it’s a longshot. The last code I worked with is below. Sometimes it produces a broken pipe error. Other times it produces an error about stream closed or unusable.

If you don’t have any ideas, I might make a reference to the problem in the issue you opened last March.

#Java

import java.io.IOException;
import java.net.URISyntaxException;

import javax.websocket.DeploymentException;

public class ClientTest {

	public static void main(String[] args) throws URISyntaxException, DeploymentException, IOException, InterruptedException {
	    Client client = new Client("ws://127.0.0.1:8081");
	    client.sendMessage("dfkdlfjdljfd");
	    //Thread.sleep(100);
	    client.onClose();
	}

}

#Java


import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;


@ClientEndpoint 
public class Client {
	
	private URI uri;
	private Session session = null;
	
	
	public Client(String address) throws URISyntaxException, DeploymentException, IOException {
		uri = new URI(address);
		WebSocketContainer container = ContainerProvider.getWebSocketContainer();
		container.setAsyncSendTimeout(-1); //sets connection timeout to never
		container.connectToServer(this, uri);
	}
	
   @OnOpen
    public void onOpen(Session session) {
        System.out.println("opening websocket " + session.getId());
        this.session = session;
    }

	@OnMessage
	public void processMessage(String message) {
		System.out.println(message);
	}
	
	public void sendMessage(String message) throws IOException {
		session.getBasicRemote().sendText(message);
	}
	
    @OnClose
    public void onClose() throws IOException {
        System.out.println("closing websocket");
        this.session.close();
    }
	
}

#Julia

using HTTP

@async HTTP.WebSockets.listen("127.0.0.1", UInt16(8081)) do ws
          while !eof(ws)
              recieved_message = readavailable(ws)
              data = String(recieved_message)
              println(data)
              send_message = "hello"
              write(ws, send_message)
              sleep(.3)
          end
      end

I don’t know if this is still a problem for you (it’s been a while) but it seems that readline closes the stream it reads. Maybe that is why you got this issue.