Non-blocking network IO

I’m going to write a node that maintains network connections with other nodes. When the node is told to shut down, it has to finish sending and receiving messages, then shut down. This has to take a few seconds at most. (Julia 1.11.1)

First I create a listening socket:

julia> svr=listen(n) # I'm keeping the port number secret for now
Sockets.TCPServer(RawFD(25) active)

In another terminal, I connect to the port using netcat, then accept the connection in Julia:

julia> c2=accept(svr)
TCPSocket(RawFD(26) open, 0 bytes waiting)

First problem: how do I tell if there is a connection waiting on the listening socket? man accept says it’s possible

In order to be notified of incoming connections on a socket, you can use select(2), poll(2), or epoll(7).
but how do I do this in Julia?

I sent the socket a few kilobytes. (First I tried eight bytes, aoeusnth, then thought that’s a lot smaller than the MTU, so I tried about 5 kB.) Then I did bytesavailable(c2) and got 0. I ran netstat -t and saw that there were no bytes waiting in the receive or transmit queue. I did read(c2,4) and got the first four bytes of the file. Then I did bytesavailable(c2) and got 4910. The connection is on localhost, so the bytes should be available immediately, shouldn’t they?

Second problem: where are these bytes hiding, and how can I tell if there are some bytes hiding there? How to read from socket in non-blocking mode is relevant, but it’s from 2017-07, before Julia 1.0. If I knew that some bytes were hiding there, I could call read(c2,1) and bytesavailable would tell me how many more bytes there were.

In case it matters, the program will AFAIK run only on some sort of Unix.

4 Likes

Julia doesn’t currently have non-blocking I/O. As the documentation for bytesavailable says:

Return the number of bytes available for reading before a read from this stream or buffer will block.

In effect this means that even if the underlying abstraction has data buffered, there still needs to be some minimally blocking call that updates the Julia-exposed datastructure before bytesavailable can show this on the Julia side.

Libuv (which julia uses under the hood here) in theory has recently merged io_uring support, but this is not exposed in this API (which is quite unfortunate from my POV).

4 Likes

Is there a way to call libuv to find out if the buffer has data?

I am currently writing Julia code with network IO and unfortunately the support for it is limited in comparison with C API. :frowning:

If you want to find out what you can call in Julia from libuv, you should probably start by looking here: julia/src/jl_uv.c at master · JuliaLang/julia · GitHub.

Libuv (which julia uses under the hood here) in theory has recently merged io_uring support, but this is not exposed in this API (which is quite unfortunate from my POV).

Be aware that Julia maintains and uses own fork of Libuv.

1 Like

I don’t know how to make use of this. If there’s a line

JL_DLLEXPORT int jl_fs_rename(const char *src_path, const char *dst_path)

does that mean that I can call fs_rename or jl_fs_rename from Julia? I also don’t know libuv.

I typed using Sockets, and by hitting tab found out that there is accept_nonblock, but it has no docstring. Where’s the source? That would probably also help me figure out how to use jl_uv.c.

1 Like

If you want call C function, then you should study more about calling C functions from Julia.

For example, to call jl_fs_rename, you can use @ccall macro:

@ccall jl_fs_rename("test"::Cstring, "test2"::Cstring)::Cint

Sockets.accept_nonblock is a private symbol, so it’s not advised to use it, unless you know what you doing. As docstring warns

The following bindings may be internal; they may change or be removed in future versions: Sockets.accept_nonblock.

REPL help, shows the path to the source code, so you can check the source code of this function to learn more about.

It is very unfortunate, there was a great talk last JuliaCon about the traps of running an HTTP service, many of which were related to IO scheduling and would be alleviated (directly or indirectly) with io_uring.

There was interest in using io_uring prior to libuv gaining the feature, I imagine the barrier is a matter of manpower (I recall reading about libuv being an especially hairy dependency to upgrade?)

Edit: Here’s the JuliaCon talk

5 Likes

That’s a really good talk. At least after watching it I probably can answer my own question I have recently posted. :slight_smile:

4 Likes

The thing is, even if julias libuv fork would use io_uring under the hood, we’d still have to “hide” that behind the current (blocking) API. If you want to make proper use of asynchronicity, you have to expose the asynchronicity of the underlying API though, which would make for a very different API. We could immediately return from a read with a Future, for example, so that the I/O is done asynchronously to our code doing something different. If we only have a blocking API exposed, we have to manually create tasks that handle the blocking part again, which IMO defeats the purpose of using io_uring under the hood in the first place.

3 Likes

I tried accept_nonblock and this is what happened:

julia> c1=Sockets.accept_nonblock(svr)
ERROR: IOError: accept: resource temporarily unavailable (EAGAIN)
Stacktrace:
 [1] uv_error
   @ ./libuv.jl:106 [inlined]
 [2] accept_nonblock(server::Sockets.TCPServer)
   @ Sockets /usr/local/lib/julia-1.11.1/share/julia/stdlib/v1.11/Sockets/src/Sockets.jl:679
 [3] top-level scope
   @ REPL[98]:1
 [4] top-level scope
   @ none:1

julia> c1
ERROR: UndefVarError: `c1` not defined in `Main`
Suggestion: check for spelling errors or missing imports.
Stacktrace:
 [1] top-level scope
   @ :0
 [2] top-level scope
   @ none:1

Then I connected to the port with netcat and tried again.

julia> c1=Sockets.accept_nonblock(svr)
TCPSocket(RawFD(26) open, 0 bytes waiting)

This is good enough for me; I can try to accept the socket, catch the error if no one’s connected, and pass the connection to a thread if someone connected.

There is no read function in Sockets.jl. Where is the read function that reads from a socket? ?read doesn’t tell me where it is because there’s a docstring.

Why not define a non-blocking Julia API first, which uses a Julia task, and if and when the libuv fork is updated, use the non-blocking API from libuv? Or are the two solutions different in behaviour?

1 Like

What do you mean by “uses a Julia task”? Any task must never wait indefinitely for something, so that the main thread can tell it to stop and know that it will stop within a limited time.

Sorry, I was referring to the statement

It’s just the regular read function:

julia> TCPSocket <: IO
true

It doesn’t have its own docstring, but you can still get to the method with edit(read, (TCPSocket,)). You’ll see that this ends up taking the IO lock again, so you won’t get around having to wait if something else wants to use IO at the same time.

One problem is that Julia-level IO takes an IO lock whenever an operation interacts with libuv, so that concurrency on the julia side doesn’t mess up the internal state. If you want to move existing code using “regular” read/writes to non-blocking use of libuv, you may inadvertently introduce correctness bugs in programs that implicitly relied on those locks for correctness. That’s part of why I’m saying an async API would/should look different from just read/write.

Another problem is that you likely won’t be able to control rescheduling of your Task objects in the way that you want them to be scheduled, because you don’t have control over when the Julia scheduler reschedules a specific task. It’s a somewhat hidden implementation detail.

This is subtle, but basically means that you either have to weave locks per-Task through every operation of your async API again (which you don’t want, since that means artificial overhead when those Tasks are woken up and notice that they aren’t allowed to progress yet) or forego the julia-native Tasks and roll your own. In the latter case you need to be careful to never yield to the Julia scheduler on your main julia Task that acts as your scheduler, or you might not be rescheduled in time again when your “hidden” IO is poking you to wake up your other tasks.

I’ve had these exact scenarios happen when I experimented with this a while ago while using Julia Tasks - if you only have one thread, you can’t ever yield from your scheduler! But then your Julia Tasks don’t ever get scheduled by the Julia scheduler either. And if you do yield, you can easily deadlock/introduce overhead because there’s no guarantee that your scheduler is scheduled again when you want it to be. It’s a Catch-22.

Other than these (and probably more) difficulties though, yes, that’s absolutely in the realm of possibility, and I’d love to see it! :slight_smile:


Finally, a bit of a cleanup/correction: if you want to get technically correct, you could link to the manual and mention that Julia does have Asynchronous I/O:

A quick look at the text though reveals that what is meant here is that calling the regular, blocking read/write inside an @async block is totally possible. This of course ends up blocking the async Task that’s spawned by the @async! IMO, that’s a far cry from an async-native I/O API, since it sidesteps all the issues with async I/O and leaves them up to the user to figure out themselves.

2 Likes

That brings up stream.jl (in nano, I then opened it in kwrite). The function contains bytes = take!(stream.buffer), so I’m searching the file for .buffer to figure out how something gets into the buffer.

As to the IO lock, if no one holds the lock for more than a second, it’s not a problem. Each socket is read and written by only one task.

Thanks for the clarifications. My relatively naive approach up to now has been to spawn a task to do IO and let it put the result in some channel. Maybe the tediousness that comes with this is what you are referring to when you say that all is on the user to figure out for themselves. I guess one could write some abstractions to make things like these more convenient, and they may behave almost the same as true async IO, but I think I‘m out of my depth to understand the subtleties after this.

There seems to be a misunderstanding. What am I trying to do?

@simsurace is not replying to you - note the small arrow & profile picture in the upper right hand corner of their message, indicating that it’s a reply to my earlier wall of text :slight_smile: You can also click that arrow to expand the originally replied-to message.

I have not further looked into the exact details of that lock, but my understanding is that it’s for the entirety of libuv across all tasks. So if something were to write tens of gigabytes of data into a socket with a single write call, my impression would be that the lock would be held for the entirety of that call, which may take quite a while. The lock is shared across tasks if I read the source correctly, so it doesn’t matter whether your tasks only access a single socket or not.

The most written in one write call is a few kilobytes. I haven’t decided if one message could ever be more than a gigabyte, but I’m leaning against it. If not, the longest message will be about 80 MB.

Is it possible for a write call to block indefinitely? I’m thinking it might happen if the network connection went down in the middle of writing a big message, and the outgoing buffer filled up, in which case it’d wait for the OS to figure out that the TCP connection was cut off.

To my understanding this lock is shared across all tasks, not just yours. So if there’s another library performing that long-winded call, it would theoretically block your code too.

To be clear, I haven’t tested this extensively, this is just from my understanding of what the code seems to be doing. I can try to run that experiment later today, it’s fairly straightforward. Just a netcat server listening, piping what it receives to /dev/null, launching a julia task that writes a few gigabyte large array in one call to a TCPSocket and attempt to send another networked message on another background task at the same time. If there’s a delay for the background message, there’s interference between tasks due to one write taking an abnormally long time.