How to pass a buffer to a C callback function?

Following my question in Help with real-time performance needed - #16 by tt1234567, I’ve been thinking about writing a non-allocating recv_into! function to receive a UDP packet into an existing buffer without creating a new one. I looked at the Sockets.jl implementation of recvfrom and was initially totally baffled, but after reading some the manual on the Julia C interface it’s making a bit more sense. It seems to me that I could copy most of recvfrom, replacing the callback in

        err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
                    sock,
                    @cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
                    @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))

with my own version of the callback function uv_recvcb. Now, that function contains the allocation

                buf = Vector{UInt8}(undef, nread)

where buf::Ptr{Cvoid} is in the function signature. If I could pass an existing Vector{UInt8} to my version of uv_recvcb I could copy the packet to that rather than allocate a new one. I know in advance what the packet size will be. That’s where I’m stuck, so the question is how can I give the function access to an existing buffer? I suppose I could try a closure, but wouldn’t that be inefficient as the packet rate is high and it would have to create a new function each time?

Just declare a Ptr{UInt8} argument in the ccall signature (instead of Ptr{Cvoid}) and pass the buf. For example:

julia> buf = Vector{UInt8}(undef, 100);

julia> ccall(:memset, Ptr{Cvoid}, (Ptr{UInt8}, Cint, Csize_t), buf, 0x77, sizeof(buf));

julia> buf
100-element Vector{UInt8}:
 0x77
 0x77
 0x77
 0x77
 0x77
    ⋮

Hi @stevengj. Thanks for replying. I may have misunderstood but the callback function uv_recvcb is passed to uv_udp_recv_start using @cfunction and called from somewhere in the C library. It’s got a fixed signature and I can’t see how to get the buffer into it.

See Passing closures via pass-through pointers.

Unfortunately, it looks like the uv_udp_recv_start function doesn’t give you a “thunk/pass-through” pointer to pass arbitrary data through to the callback? In that case, the options are to either use a global variable or to use $ with @cfunction to create a runtime closure.

Here’s a small example of how to do C-calls with buffers. It’s a small snippet which set up a receiving udp-port with the basic POSIX calls socket, bind, and recv.

const AF_INET=2
const SOCK_DGRAM=2

struct sockaddr
  family::UInt16
  port::UInt16
  addr::UInt32
  pad::UInt64
end

function chkerr(stat, msg)
  if stat < 0
    @ccall perror(msg::Cstring)::Cvoid
  end
end

function udpsock(port::Integer) 
  sock = @ccall socket(AF_INET::Cint, SOCK_DGRAM::Cint, 0::Cint)::Cint
  chkerr(sock, "socket")
  
  nport = hton(UInt16(port))
  addr = sockaddr(AF_INET, nport, 0, 0)
  stat = @ccall bind(sock::Cint, Ref(addr)::Ptr{Cvoid}, sizeof(addr)::Csize_t)::Cint
  chkerr(stat, "bind")
  sock
end

function recv!(buf, sock)
  @ccall recv(sock::Cint, buf::Ptr{Cvoid}, sizeof(buf)::Csize_t, 0::Cint)::Csize_t
end

sock = udpsock(4321)

buf = fill(0x0, 64)

n = recv!(buf, sock)

# then do a
# echo '012345' | nc -uN localhost 4321
# in the shell
println("Got $n bytes: ", String(buf[1:n]))

So, I had your UDP-problem you reference some time ago. I solved it by the above low-level interface to recv, created a ring-buffer of size 256, of fixed size buffers (my udp-packets had a limited size), using Atomic{UInt8} for beginning and end. (Or used a DataStructures.CircularBuffer, I don't remember). Then wrapped my recv!` in a thread reading forever and storing in the ring-buffer, and let other threads read from the ring-buffer, using the atomic counters (which wrap at the end).

No, it doesn’t seem to have any way to pass other data to the callback.

That’s extremely helpful, thanks! At the moment it looks like I could either hack the Sockets.jl code using a global variable or abandon Sockets.jl and libuv altogether by using recv directly. I’ll have to think about that as I’m not at all familiar with sockets and have relied on Sockets.jl to make it easy to open, close, join multicast groups, etc. Queues and buffering I can probably manage.

I haven’t done multicast-things, but there’s a short C-thing here: https://www.tenouk.com/Module41c.html. (Search for IP_ADD_MEMBERSHIP). The main problem is typically to figure out values of constants, like AF_INET etc. And the layout of structs in C, like the struct ip_mreq group used there (they’re in the file /usr/include/netinet/in.h:

struct ip_mreq
  {
    /* IP multicast address of group.  */
    struct in_addr imr_multiaddr;
    /* Local IP address of interface.  */
    struct in_addr imr_interface;
  };

(and in_addr is there too, as UInt32). But frankly, I think the stdlib Sockets should provide a recv!.

Thanks. It looks as though you can join a multicast group using setsockopt. Unfortunately, I’m using Windows and there appear to be differences with the Linux versions of the constants. For example, the function socket() returns an unsigned integer and you have to check for a value of INVALID_SOCKET instead of -1. I’ve no idea how to access INVALID_SOCKET other than copy it from Winsock2.h.

That would be great, but I suppose they can’t do everything and maybe there’s not much demand for one. Python has one …

1 Like

OK, it’s defined as (~0) so I guess that interpreting the return value as a signed integer means that -1 would work.

In the end I stayed with Sockets.jl and used a global variable to pass the buffer to the callback function. It’s rather ugly and I’m not even sure if I used the global variable in a type-stable way. I thought I’d post the code here just in case it’s of help to someone, although I wouldn’t recommend using it as it is because there are bits I didn’t really understand. Thanks to everyone for their help.

recv_into
using Sockets

# global variable to pass the buffer to the callback function. Hopefully using a typed global will
# make the functions type stable.
global recv_into_buf::Vector{Vector{UInt8}}

# this is copied from Sockets.uv_recvcb but with the data read into a global buffer
function uv_recv_into_cb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint)
    global recv_into_buf

    sock = Base.@handle_as handle UDPSocket
    lock(sock.recvnotify)
    try
        buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf)
        if nread == 0 && addr == C_NULL
            Libc.free(buf_addr)
        elseif nread < 0
            Libc.free(buf_addr)
            Base.notify_error(sock.recvnotify, _UVError("recv", nread))
        elseif flags & Sockets.UV_UDP_PARTIAL > 0
            Libc.free(buf_addr)
            Base.notify_error(sock.recvnotify, "Partial message received")
        else
            buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf))
            ncopy = nread

            if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k)
                buf = unsafe_wrap(Array, buf_addr, nread, own=true)
                # I don't understand what this bit's for, so if if it ever gets called raise an
                # error and I'll have to deal with it then
                Base.notify_error(sock.recvnotify, "buf_size, " * string(buf_size) * ", nread " * string(nread))
            else
                # replace the original buf by the global one
                # buf = Vector{UInt8}(undef, nread)
                buf = recv_into_buf[1]
                # if length(buf)<nread
                #     Base.notify_error(sock.recvnotify,"buffer too short, length = " * string(length(buf)) * ", nread = " * string(nread))
                # end
                ncopy = min(length(buf), nread)

                # probably don't need to do the GC stuff any more??
                GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, ncopy)
                Libc.free(buf_addr)
            end
            # need to check the address type in order to convert to a Julia IPAddr
            host = Sockets.IPv4(0)
            port = UInt16(0)
            if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1
                host = Sockets.IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr)))
                port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr))
            elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1
                tmp = Ref{UInt128}(0)
                scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp)
                host = Sockets.IPv6(ntoh(tmp[]))
                port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr))
            end
            # from = Sockets.InetAddr(host, port)
            notify(sock.recvnotify, ncopy, all=false)
        end
        if sock.status == Base.StatusActive && isempty(sock.recvnotify)
            sock.status = Base.StatusOpen
            ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock)
        end
    finally
        unlock(sock.recvnotify)
    end
    nothing
end

# this is copied from Sockets.recvfrom but with the data passed through a global buffer
function recv_into!(buf::Vector{UInt8}, sock::UDPSocket)
    # check if the global buffer's been initialised
    global recv_into_buf
    if !@isdefined recv_into_buf
        recv_into_buf = Vector{Vector{UInt8}}(undef,1)
    end

    recv_into_buf[1] = buf

    Base.iolock_begin()
    # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
    if sock.status != Base.StatusInit && sock.status != Base.StatusOpen && sock.status != Base.StatusActive
        error("UDPSocket is not initialized and open")
    end
    if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0
        err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
                    sock,
                    @cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
                    @cfunction(uv_recv_into_cb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))
        Base.uv_error("recv_start", err)
    end
    sock.status = Base.StatusActive
    lock(sock.recvnotify)
    Base.iolock_end()
    try
        # From = Union{Sockets.InetAddr{IPv4}, Sockets.InetAddr{IPv6}}
        # Data = Vector{UInt8}
        # from, data = wait(sock.recvnotify)::Tuple{From, Data}
        # return (from, data)
        nread = wait(sock.recvnotify)::Int
        return nread
    finally
        unlock(sock.recvnotify)
    end
end

A simple test:

receive test
using Sockets
group = ip"239.2.0.21"
s = Sockets.UDPSocket()
Sockets.bind(s, ip"0.0.0.0", 2021)
join_multicast_group(s, group)

# hostport, packet = Sockets.recvfrom(s)
buf = Vector{UInt8}(undef,4)
nread = recv_into!(buf, s)

I ran the sender for testing in a separate Julia REPL

send test
using Sockets
group = ip"239.2.0.21"
s = Sockets.UDPSocket() # don't need to bind
x=Vector{UInt8}([1; 2; 3; 4])
send(s, group, 2021, x)