Advice needed on multithreaded processing for live data from a UDP socket

I have an application which receives data from a sensor via UDP, does some processing and displays the results. The data is read in a separate thread and buffered using a Channel. It’s important that the receiver thread isn’t interrupted or packets will be lost. An occasional hold-up elsewhere doesn’t matter as long as the processing and display keeps up on average.

This works quite well so far, but there are some limitations which prevent more complicated processing being used:

  1. If there’s any sizeable amount of allocations the garbage collector seems to regularly block all the threads, including the receive one, and packets get dropped.

  2. Most of Julia’s threading capabilities don’t give much control over which threads are used. I use ThreadPools.@tspawnat to run the receiver on a particular thread. As far as I know, using something like @threads won’t guarantee that the receiver thread isn’t used so could lead to dropped packets.

I’m running the code on a Windows 10 laptop with a core i7 processor (6 cores) and Julia 1.8.3. At the moment the whole thing uses around 10% CPU, ie less than 100% of a single core, but we want to do extra processing which would go well beyond that.

I suppose one solution would be to write all the UDP socket access and buffering in C/C++, but that sounds a bit awkward (and I’m not sure how do do it anyway. Otherwise it might be possible to allocate tasks to threads manually using ThreadPools but that means I can’t use all of Julia’s nice multithreading capabilities. Any advice would be gratefully received.

In case it’s of any use, here’s a toy version which very roughly illustrates the current code structure:

using GLMakie
using ThreadPools
using FFTW

function update(chan, nx, ny)
    println("update, thread id: ", Threads.threadid())

    while isopen(chan)
        # in the actual application data comes from a UDP socket using a non-allocating recv_into!
        # data rate is about 300 Mbps
        x = rand(nx,ny) .- 0.5
        put!(chan, x)
        sleep(0.3)
    end

    println("update finishing, thread id: ", Threads.threadid())
end

function process(chan, nx, ny)
    println("process, thread id: ", Threads.threadid())
    yield()

    # GLMakie.set_window_config!(framerate=5.0)

    nk = div(nx, 2) + 1
    hm_node = GLMakie.Observable(zeros(nk,ny))

    fig = GLMakie.Figure(fontsize=30)
    ax1 = fig[1,1] = GLMakie.Axis(fig)
    hm = GLMakie.image!(ax1, hm_node, colormap=:viridis, interpolate=false)

    GLMakie.display(fig)
    yield()

    task = ThreadPools.@tspawnat 3 update(chan, nx, ny)

    while isopen(chan)
        x = take!(chan)
        # Do some processing on x. Would like to make it multithreaded but not interfere with the update thread.
        # For illustration, just do an FFT (it's non-allocating in the real code).
        y = rfft(x) 
        hm_node[] = abs2.(y)
    end

    println("process finishing, thread id: ", Threads.threadid())
end

GLMakie.closeall()
nbuffer = 15
ch = Channel{Matrix{Float64}}(nbuffer)

task = ThreadPools.@tspawnat 2 process(ch, 200, 100)

# to stop the task, use close(ch)

Since there haven’t been any replies, I guess there’s no easy answer to this. It looks as though the only way I can isolate my data-receiving thread and keep the Garbage Collector from blocking it is to interface to C/C++. I could try to write a C++ function/class which starts a thread which reads the UDP data and puts it into a blocking queue and another function to take data from the queue. It’ll take a lot of effort to find out how to do that so, before I start, my question now is will the C++ thread be safe from interference from the GC?

It’s a bit disappointing to have to replace a few lines of Julia code with a lot of C++ but I suppose this isn’t something many people are using Julia for at the moment.

Well, that was a bit of a struggle, probably due to my poor C++ skills. Just in case anyone else is interested, I think I’ve now isolated the UDP access and some buffering in a C++ thread which appears to be immune from the garbage collector (and also seems to use a bit less CPU). It’s difficult to be absolutely sure exactly how well it’s working but it should become clearer when we develop the processing a bit further.

I used the following

  • C++ std::thread for threading
  • winsock 2 for UDP sockets
  • C++ std::counting_semaphore to write a simple circular buffer
  • CxxWrap.jl to create a suitable dll and access it from Julia

It would have been much easier if it had been possible to do the same thing in Julia but maybe that will happen eventually.