Help with real-time performance needed

I have an application which reads sensor data packets from a UDP multicast socket over a dedicated ethernet link, processes them and updates a display using the excellent Makie. This was working fine on our original system but we’ve upgraded the sensor, increasing the number of channels and the overall data rate, and we’ve run into problems with dropping data.

The original system has a packet size of 4096 bytes and an overall data rate of about 65 Mbps. The new system has a packet size of 8192 bytes and an overall data rate of about 306 Mbps.

The receiving system is running on a Windows 10 laptop with a core i7 processor (6 cores) and 16 GB of memory. It’s using Julia 1.8.1 with 6 threads.

What seems to happen is that the system runs happily for a while but after a few minutes the CPU usage goes up, the memory usage goes down and the recieving thread drops packets. It’s then ok for a while but the same things keeps repeating. I assume this is something to do with the garbage collector as each packet is a new Vector{UInt8} and eventually gets discarded.

The overall memory usage isn’t that high and goes from about 3 GB to 5 GB before garbage collection. CPU usage is about 10-12% until the problem occurs, when it goes up to about 25%.

I can’t think of a way of providing a MWE, but here’s an outline of the code

Receive Function
function receive_adc(chan::Channel, s::UDPSocket; group = ip"", port=2021)
    println("receive_adc, thread id: ", Threads.threadid())
    ret = Sockets.bind(s, ip"", port, reuseaddr=true)
    if !ret
        println("bind error")
    join_multicast_group(s, group)

    # set receive buffer size. see
    arg = Ref{Cint}(0) # pass 0 to read the value
    ccall(:uv_recv_buffer_size, Cint, (Ptr{Cvoid}, Ptr{Cint}), s.handle, arg)
    @show arg
    arg = Ref{Cint}(65536*8*8*16*4) # pass nonzero to set the value (doubled on linux)
    ccall(:uv_recv_buffer_size, Cint, (Ptr{Cvoid}, Ptr{Cint}), s.handle, arg)
    @show arg
    arg = Ref{Cint}(0) # pass 0 to read the value
    ccall(:uv_recv_buffer_size, Cint, (Ptr{Cvoid}, Ptr{Cint}), s.handle, arg)
    @show arg

        while s.status != Base.StatusClosed
            # hostport, packet = Sockets.recvfrom(s)
            packet = Sockets.recv(s)
            put!(chan, packet)
    catch my_exception
		if isa(my_exception, InterruptException)
            leave_multicast_group(s, group)
            println("receive_adc closing down")
        println("exception: ", my_exception)
    println("receive_adc closing down, thread id: ", Threads.threadid())
    return nothing
using Sockets
using GLMakie
using ThreadPools
using DataStructures

function stream_sensor(s)
    # buffer to keep 30 s of past data so it can be saved if something interesting happens
    historybuf = CircularBuffer{Array{UInt8}}(Nhistory)

    # channel to communicate with receive_adc task to allow a buffer while processing and displaying, etc. 
    chan = Channel{Array{UInt8,1}}(32000) # around 8 s of data

    # set the receiving task in its own thread
    receive_task = ThreadPools.@tspawnat 3 receive_adc(chan, s)

    while (s.status != Base.StatusClosed) && isopen(chan)
        packet = take!(chan)
        push!(historybuf, packet)
        do_processing(packet) # FFTs, filtering, etc.

        if enough_data
            update_display # around once per second

s = Sockets.UDPSocket()
task = ThreadPools.@tspawnat 2 stream_sensor(s)

All the packets eventually fall out of historybuf and will get caught by the garbage collector.

I’d be grateful for any suggestions as to how to fix the dropped data. The only things I can think of are

  1. Use a version of Sockets.recv which writes to an existing array rather than allocates
  2. Give receive_task a higher priority
  3. Change the behavior of the gc to even out its CPU usage

Unfortunately I haven’t a clue how to do any of those things so I could really do with some help.


You could use ZMQ.jl instead of directly using UDP and Sockets. it handles the queues and “watermark” for you, it supports zero-copy messages.
So potentially there can be zero allocations on the Julia side and hence no GC.

1 Like

And another damn stupid answer from me…
We have another thread regarding garbage collection on Windows. Does this system have hyperthreading enabled or disabled?

Hi @TsurHerman, thanks for replying. Getting rid of the allocations might solve the problem. I had a quick look at ZMQ.jl but the documentation’s a bit sparse. Could you give any more details of how I could read a packet without allocating? If I could read it into an existing Vector{UInt8} I could keep a large array of them and just reuse them.

Hi @johnh. Thanks, I did see that thread but it didn’t seem quite the same as my case (and I didn’t fully understand it). Mine’s a standard Windows 10 Home Edition installation with 6 cores and 12 logical processors so I assume hyperthreading is enabled.

You are only going to be able to use ZMQ if you control both ends of this system and can make the sensor send ZMQ packets.

If you don’t control the sensor software then you will need to figure out how to do the regular reading of a socket without allocation which probably means using ccall and doing your own buffer handling. Basically write some function like recvinto!(sock,buf) which does the ccall to recv into a fixed buffer and then copies the data into your Julia buf

Be aware if you want to call recvinto! from multiple threads you will need a lock or a thread specific internal buffer for each thread

Unless anyone has a better idea.

1 Like

thanks for the advice, @dlakelan. I do have control over both ends of the system but the other end is written in C using lwip on a Xilinx SoC. I haven’t much experience in networking and went for the simplest possible approach so I could be doing something wrong.

I’ll have a go at writing some form of recvinto! as you suggest, although I may have to come back for help as I’m a beginner at this. I’m not planning to call it from more than one thread, in fact my whole aim has been to isolate the socket access as much as possible to prevent data loss while processing or display.

It’s a bit of a frustrating problem to investigate as it takes quite a long time to appear (at least 10 minutes). Once it does appear it seems to happen again fairly quickly if I keep the same Julia session open. Maybe that does indicate some sort of memory fragmentation or perhaps I’m doing something else wrong.

1 Like

Another option might be to spawn off a c thread which is just calling receive and stacking up packets when Julia pauses for a garbage collection the C thread will continue and can stack up packets until Julia is able to ask for them. I kind of like this idea because it isolates your receive loop into a continuously running process that will never be paused.

Thanks, that sounds like a good idea, although I’m not sure where to start. Is spawning a C thread easy?

I did have a look at Sockets.recvfrom(sock::UDPSocket) for inspiration on how to write a recv_into!() function as you suggested. I found it rather baffling as I’m not familiar with either libuv or the Julia C interface. Python seems to have a recv_into which looks like it does exactly what I want. I’m not very familiar with Python but perhaps it will provide some clues. Looks like I’ve got a lot of learning to do.

By the way, I was running the equipment again and found that calling GC.gc(false) on every display cycle (about once a second) seems to reduce the packet loss at the expense of increasing overall CPU usage and making the display less smooth, although it’s a bit hard to tell for sure.

Don’t use lib UV just spawn a pthread that runs a c function that does the work of stacking up the packets it wouldn’t be particularly hard but it will require you to learn something about pthreads

Also this is a fairly significant amount of data to be processing and it seems very likely to me that you don’t want to be updating your display on every new packet you probably want to update your display on a clock like 60 times a second

Never mind I see you’re already updating the display only every so often

For soft-real time I would try out in Julia 1.9:

New option --heap-size-hint=<size> gives a memory hint for triggering greedy garbage collection.

Try different values, e.g. as low as you can go.

For hard-real time (i.e. any dropped package is catastrophic), you want NO allocations, and thus get rid of GC as a potential trouble.

Whatever you do, try disabling threads, it can slow down the GC a lot:

[Of course we want threads to work well with the GC, so hopefully that’s a temporary issue, maybe already fixed on 1.9?]

The GC in Julia isn’t hard-real time (such GC is available in e.g. Java, and maybe will be made later in Julia… but I wouldn’t hold by breath since, it, unlike HPC, not a priority, also easy to avoid the GC), but I’m not sure it’s too good (currently) even for soft-real time, at least with threading.

There’s ongoing work on the GC, I believe some already landed in 1.9 (with default settings), at least the option above. Historically Julia had no options for its GC (well, except for turning it off; temporarily, which isn’t a great idea), and I believe that new option is still the only option.

What you might want is set a high enough absolute (real-time) priority. Just be warned, on Windows I once set a frozen process to realtime, and predictably blocked the rest of the system running… and had to restart the machine:

I don’t know how to set it programmably there, but I do know how to set priorities in Linux.

I believe the (historical) Linux/Unix priority is relative, not absolute (since a multi-user system), so this wouldn’t happen, with nice (or renice).

Nice is to be, well, nice to others, to lower priority, but you can use a negative value:

ps -eo uid,pid,rtprio,ni,pri,cmd |grep julia

Niceness generally ranges from -20 to 19, with -20 being the most favorable

Only a privileged process can change a process’ absolute priority to something other than 0. Only a privileged process or the target process’ owner can change its absolute priority at all.

POSIX requires absolute priority values used with the realtime scheduling policies to be consecutive with a range of at least 32. On Linux, they are 1 through 99. The functions sched_get_priority_max and sched_set_priority_min portably tell you what the range is on a particular system.

That’s for Linux/POSIX, but might work on Windows too? Since Windows claims POSIX compiliant, even before WSL2, what you might also want to try out.

You can monito in the task manager, or equivalently with top (or GUI tool) on Linux, e.g. these columns (some not on by default), from its man page:

  21. PR  --  Priority
      The scheduling priority of the task.  If you see `rt' in this field, it means the task is running under real time scheduling priority.

      Under linux, real time priority is somewhat misleading since traditionally the operating itself was not preemptible.  And while the 2.6 kernel can be made mostly preemptible, it is not always so.
  15. OOMa  --  Out of Memory Adjustment Factor
      The value, ranging from -1000 to +1000, added to the current out of memory score (OOMs) which is then used to determine which task to kill when memory is exhausted.

  16. OOMs  --  Out of Memory Score
      The value, ranging from 0 to +1000, used to select task(s) to kill when memory is exhausted.  Zero translates to `never kill' whereas 1000 means `always kill'.

I did also find at a former colleague (but not elsewhere, so that manual may be outdated…):

Do you mean continue using Sockets.recv but implement the queue/channel in C? I have used pthreads but it was a long time ago and not in Windows.

The occasional dropped packet isn’t too much of a problem so this is definitely worth trying. I’d like to get rid of the allocations if possible but trying a different version of Julia sounds a lot easier.

I’ll look into the idea of changing the Windows task priority. Thanks for your help.

1 Like

I’ve now tried Julia 1.9 with a few different values for --heap-size-hint=<size>. I haven’t tested it extensively but It does seem to stabilise things a bit for a value of around 5M. It does still drop data occasionally and the CPU usage is high but at least it’s fairly constant. Previously CPU would start at 10% and end up after 20 min at around 25%, dropping data massively.

Disabling threads doesn’t work in my case as it means data can’t be queued while processing and display is going on.

I’d really like to try using a non-allocating recv_into!(). I’d try writing one myself but just I don’t know how to go about it. Is there a way of submitting a feature request for Sockets?

1 Like

Ccall is all you need I think.

Ccall to recv with a fixed const global buffer of length big enough for any packet you could want (10kB for example as even most networks using jumbo frames are less than that). Then after getting the data and the length, copy that many bytes from the buffer into the passed in buffer…

const gsockbuf = zeros(UInt8,10240)
function recv_into!(sock,buf)
   # ccall to n = recv(sock, gsockbuf,10240,0)

Is kind of pseudocode to get you started.

Thanks very much for the help. Using recv instead of Sockets.jl may be a good option if I can work out the details.

By the way, sorry to have started two nearly-duplicate threads. I started the other one thinking it would be better for a more specific question but they’ve got a bit muddled up.

Look into it a little you may find that you can combine them. Set up the sockets with Sockets.jl but then implement a recv_into! Function. You just need to figure out how to extract the OS socket descriptor from the Julia socket struct.

That’s a good idea, thanks! There is a uv_fileno function which takes a libuv handle and gives an OS file descriptor, presumably just an int on Windows. That would make life a lot easier. I wonder if extracting that and using recv would cause any internal problems for libuv? Only one way to find out I suppose.