Async UDP socket usage

We have a problem with async usage of UDP sockets, so I made a MWE to demonstrate it.

We run a simple UDP echo deamon with socat:

socat -v PIPE udp-recvfrom:9809,fork

This listens on port 9809 and echoes back to the sender’s UDP port.

This code works:

using Sockets

sock = UDPSocket()
bind(sock, ip"127.0.0.1", 0)

send(sock, ip"127.0.0.1", 9809, "hello")
@info String(recv(sock))

sleep(1)
close(sock)

and prints the echoed packet back.

This code does not work:

using Sockets

sock = UDPSocket()
bind(sock, ip"127.0.0.1", 0)

@async begin
  @info String(recv(sock))
end
send(sock, ip"127.0.0.1", 9809, "hello")

sleep(1)
close(sock)

in the sense that nothing is received back.

Placing small delays in the code makes it work. For example a sleep(0.001) inside the @async will make the code work.

Depending on the exact timing of when the send() and recv() are executed, either nothing is received (UDP packet is silently dropped) or an exception EOFError: read end of file is thrown. Neither makes sense to me.

So, the question is, should I not expect send() and recv() using the same socket to work in asynchronous code?

1 Like

There is a high chance you are reading your socket buffer without anything being written to it beforehand. You have to use wait for send(sock, ip"127.0.0.1", 9809, "hello") first.

Another way would be to put your async part in a while loop like this:

@async begin
   while isopen(sock)
        @info String(recv(sock))
   end
end

recv() is meant to be block for data. From the docs:

  recv(socket::UDPSocket)

Read a UDP packet from the specified socket, and return
the bytes received. This call blocks.

I tested the code snippet with the while loop, and it has the same problem.

1 Like

My bad, I forgot about the blocking part. I think pinning the recv function to a separate thread should work as expected.

sock = UDPSocket()
bind(sock, ip"127.0.0.1", 0)

StableTasks.@spawnat 4 while isopen(sock1)
    recv(sock) |> String |> println
end

for i in 1:5
    send(sock, ip"127.0.0.1", 9809, "hello"); sleep(0.1)
end

Just in case if you are not familiar with StableTasks.@spawnat 4, it is starting and pining the task to thread 4, assuming you have started julia with 4 threads in this case.

Threads works better, but still has the same problem sometimes. In any case, any reason why async on a single thread should not be expected to work? Nothing in the documentation suggests that it should not.

1 Like

What happens in CODE 1 (your not working example) is task blocks first thread and send is never executed, because task is waiting without yielding to main task.

Analyse the two code snippets I provided below.

I am not going to dwell into this subject since I am not an expert, but always wrap code into function.

# CODE 1
using Sockets

function test()
    sock = UDPSocket()
    bind(sock, ip"127.0.0.1", 0)

    @debug "THIS IS EXECUTED"
    task = @async begin
        @info String(recv(sock))
    end
    @debug "THIS IS NEVER EXECUTED"
    send(sock, ip"127.0.0.1", 9809, "hello")


    wait(task)
    close(sock)
end

test()

# CODE 2
using Sockets

function test()
    sock = UDPSocket()
    bind(sock, ip"127.0.0.1", 0)

    send(sock, ip"127.0.0.1", 9809, "hello")
    @debug "THIS IS EXECUTED"
    task = @async begin
        @info String(recv(sock))
    end
    @debug "THIS IS EXECUTED THIS TIME"
    
    wait(task)
    close(sock)
end

test()

The whole point of async is to not block on receive. And it does not. The send works fine. I can even see the transmitted and received UDP packets on a tcpdump. It is the receive that never gets it when it fails.

And it works sometimes depending on timing. So it isn’t a systematic problem with blocking.

1 Like

One thing that’s for sure suspicious about your code is that you are starting the send() before the recv(). (@async only schedules a task but doesn’t immediately run it, so the @async task and therefore the recv() are executed only when the main task yields, which is during the send().) Fixing this order fixes the code, i.e. the following works.

julia> sock = UDPSocket()
       bind(sock, ip"127.0.0.1", 0)

       # This switches to the task immediately
       yield(@task @info String(recv(sock))) 
       send(sock, ip"127.0.0.1", 9809, "hello")

       sleep(1)
       close(sock)

What I don’t understand is why executing the send() before the recv() consistently fails. Usually, the networking stack caches unreceived datagrams such that the following works just fine:

julia> sock = UDPSocket()
       bind(sock, ip"127.0.0.1", 0)

       send(sock, ip"127.0.0.1", 9809, "hello")
       @info String(recv(sock))

       close(sock)
[ Info: hello

My best guess is that there is some weird interaction between the send() and recv() which causes the recv() to not fire correctly if you flip-flop between the two.

Hah, I think I know what’s happening! The problem is that recv() first checks whether the UDP socket is “active” and only if it’s found to be not active then it registers a callback for when a matching UDP datagram arrives. From Sockets.jl:

if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0
    err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
                sock,
                @cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
                @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))

The trouble is that sending a datagram also counts as active; therefore, if the recv() is executed during the send() then the callback is never registered and therefore the recv() never resolves.

julia> sock = UDPSocket()
       bind(sock, ip"127.0.0.1", 0)
       println("Before send: ", is_active(sock))

       @async begin
           println("In async before yield: ", is_active(sock))
           println("In async after yield: ", is_active(sock))
       end
       send(sock, ip"127.0.0.1", 9809, "hello")
       println("After send: ", is_active(sock))

       close(sock)
Before send: 0
In async before yield: 1
In async after yield: 0
After send: 0

This also explains why delaying the recv() even further by putting a sleep() before it fixes the problem: the extra sleep means the task yields once more, and that allows the send() to finish before the recv() starts.

5 Likes

Opened an issue on GitHub: send() / recv() on UDPSocket is not async-safe · Issue #57001 · JuliaLang/julia · GitHub

5 Likes