I have an application which reads sensor data packets from a UDP multicast socket over a dedicated ethernet link, processes them and updates a display using the excellent Makie. This was working fine on our original system but we’ve upgraded the sensor, increasing the number of channels and the overall data rate, and we’ve run into problems with dropping data.
The original system has a packet size of 4096 bytes and an overall data rate of about 65 Mbps. The new system has a packet size of 8192 bytes and an overall data rate of about 306 Mbps.
The receiving system is running on a Windows 10 laptop with a core i7 processor (6 cores) and 16 GB of memory. It’s using Julia 1.8.1 with 6 threads.
What seems to happen is that the system runs happily for a while but after a few minutes the CPU usage goes up, the memory usage goes down and the recieving thread drops packets. It’s then ok for a while but the same things keeps repeating. I assume this is something to do with the garbage collector as each packet is a new Vector{UInt8}
and eventually gets discarded.
The overall memory usage isn’t that high and goes from about 3 GB to 5 GB before garbage collection. CPU usage is about 10-12% until the problem occurs, when it goes up to about 25%.
I can’t think of a way of providing a MWE, but here’s an outline of the code
Receive Function
function receive_adc(chan::Channel, s::UDPSocket; group = ip"239.2.0.21", port=2021)
println("receive_adc, thread id: ", Threads.threadid())
yield()
ret = Sockets.bind(s, ip"0.0.0.0", port, reuseaddr=true)
if !ret
close(s)
println("bind error")
return
end
join_multicast_group(s, group)
# set receive buffer size. see
# https://discourse.julialang.org/t/set-udpsocket-reciever-buffer-size/3939/4
arg = Ref{Cint}(0) # pass 0 to read the value
ccall(:uv_recv_buffer_size, Cint, (Ptr{Cvoid}, Ptr{Cint}), s.handle, arg)
@show arg
arg = Ref{Cint}(65536*8*8*16*4) # pass nonzero to set the value (doubled on linux)
ccall(:uv_recv_buffer_size, Cint, (Ptr{Cvoid}, Ptr{Cint}), s.handle, arg)
@show arg
arg = Ref{Cint}(0) # pass 0 to read the value
ccall(:uv_recv_buffer_size, Cint, (Ptr{Cvoid}, Ptr{Cint}), s.handle, arg)
@show arg
try
while s.status != Base.StatusClosed
# hostport, packet = Sockets.recvfrom(s)
packet = Sockets.recv(s)
put!(chan, packet)
end
catch my_exception
if isa(my_exception, InterruptException)
leave_multicast_group(s, group)
close(s)
println("receive_adc closing down")
end
println("exception: ", my_exception)
end
println("receive_adc closing down, thread id: ", Threads.threadid())
yield()
return nothing
end
Processing
using Sockets
using GLMakie
using ThreadPools
using DataStructures
function stream_sensor(s)
# buffer to keep 30 s of past data so it can be saved if something interesting happens
historybuf = CircularBuffer{Array{UInt8}}(Nhistory)
# channel to communicate with receive_adc task to allow a buffer while processing and displaying, etc.
chan = Channel{Array{UInt8,1}}(32000) # around 8 s of data
# set the receiving task in its own thread
receive_task = ThreadPools.@tspawnat 3 receive_adc(chan, s)
while (s.status != Base.StatusClosed) && isopen(chan)
packet = take!(chan)
push!(historybuf, packet)
do_processing(packet) # FFTs, filtering, etc.
if enough_data
update_display # around once per second
end
end
end
s = Sockets.UDPSocket()
task = ThreadPools.@tspawnat 2 stream_sensor(s)
All the packets eventually fall out of historybuf and will get caught by the garbage collector.
I’d be grateful for any suggestions as to how to fix the dropped data. The only things I can think of are
- Use a version of
Sockets.recv
which writes to an existing array rather than allocates - Give receive_task a higher priority
- Change the behavior of the gc to even out its CPU usage
Unfortunately I haven’t a clue how to do any of those things so I could really do with some help.
Tony