In the end I stayed with Sockets.jl and used a global variable to pass the buffer to the callback function. It’s rather ugly and I’m not even sure if I used the global variable in a type-stable way. I thought I’d post the code here just in case it’s of help to someone, although I wouldn’t recommend using it as it is because there are bits I didn’t really understand. Thanks to everyone for their help.
recv_into
using Sockets
# global variable to pass the buffer to the callback function. Hopefully using a typed global will
# make the functions type stable.
global recv_into_buf::Vector{Vector{UInt8}}
# this is copied from Sockets.uv_recvcb but with the data read into a global buffer
function uv_recv_into_cb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint)
global recv_into_buf
sock = Base.@handle_as handle UDPSocket
lock(sock.recvnotify)
try
buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf)
if nread == 0 && addr == C_NULL
Libc.free(buf_addr)
elseif nread < 0
Libc.free(buf_addr)
Base.notify_error(sock.recvnotify, _UVError("recv", nread))
elseif flags & Sockets.UV_UDP_PARTIAL > 0
Libc.free(buf_addr)
Base.notify_error(sock.recvnotify, "Partial message received")
else
buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf))
ncopy = nread
if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k)
buf = unsafe_wrap(Array, buf_addr, nread, own=true)
# I don't understand what this bit's for, so if if it ever gets called raise an
# error and I'll have to deal with it then
Base.notify_error(sock.recvnotify, "buf_size, " * string(buf_size) * ", nread " * string(nread))
else
# replace the original buf by the global one
# buf = Vector{UInt8}(undef, nread)
buf = recv_into_buf[1]
# if length(buf)<nread
# Base.notify_error(sock.recvnotify,"buffer too short, length = " * string(length(buf)) * ", nread = " * string(nread))
# end
ncopy = min(length(buf), nread)
# probably don't need to do the GC stuff any more??
GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, ncopy)
Libc.free(buf_addr)
end
# need to check the address type in order to convert to a Julia IPAddr
host = Sockets.IPv4(0)
port = UInt16(0)
if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1
host = Sockets.IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr)))
port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr))
elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1
tmp = Ref{UInt128}(0)
scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp)
host = Sockets.IPv6(ntoh(tmp[]))
port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr))
end
# from = Sockets.InetAddr(host, port)
notify(sock.recvnotify, ncopy, all=false)
end
if sock.status == Base.StatusActive && isempty(sock.recvnotify)
sock.status = Base.StatusOpen
ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock)
end
finally
unlock(sock.recvnotify)
end
nothing
end
# this is copied from Sockets.recvfrom but with the data passed through a global buffer
function recv_into!(buf::Vector{UInt8}, sock::UDPSocket)
# check if the global buffer's been initialised
global recv_into_buf
if !@isdefined recv_into_buf
recv_into_buf = Vector{Vector{UInt8}}(undef,1)
end
recv_into_buf[1] = buf
Base.iolock_begin()
# If the socket has not been bound, it will be bound implicitly to ::0 and a random port
if sock.status != Base.StatusInit && sock.status != Base.StatusOpen && sock.status != Base.StatusActive
error("UDPSocket is not initialized and open")
end
if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0
err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
sock,
@cfunction(Base.uv_alloc_buf, Cvoid, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid})),
@cfunction(uv_recv_into_cb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)))
Base.uv_error("recv_start", err)
end
sock.status = Base.StatusActive
lock(sock.recvnotify)
Base.iolock_end()
try
# From = Union{Sockets.InetAddr{IPv4}, Sockets.InetAddr{IPv6}}
# Data = Vector{UInt8}
# from, data = wait(sock.recvnotify)::Tuple{From, Data}
# return (from, data)
nread = wait(sock.recvnotify)::Int
return nread
finally
unlock(sock.recvnotify)
end
end