I have an application which receives data from a sensor via UDP, does some processing and displays the results. The data is read in a separate thread and buffered using a Channel
. It’s important that the receiver thread isn’t interrupted or packets will be lost. An occasional hold-up elsewhere doesn’t matter as long as the processing and display keeps up on average.
This works quite well so far, but there are some limitations which prevent more complicated processing being used:
-
If there’s any sizeable amount of allocations the garbage collector seems to regularly block all the threads, including the receive one, and packets get dropped.
-
Most of Julia’s threading capabilities don’t give much control over which threads are used. I use
ThreadPools.@tspawnat
to run the receiver on a particular thread. As far as I know, using something like@threads
won’t guarantee that the receiver thread isn’t used so could lead to dropped packets.
I’m running the code on a Windows 10 laptop with a core i7 processor (6 cores) and Julia 1.8.3. At the moment the whole thing uses around 10% CPU, ie less than 100% of a single core, but we want to do extra processing which would go well beyond that.
I suppose one solution would be to write all the UDP socket access and buffering in C/C++, but that sounds a bit awkward (and I’m not sure how do do it anyway. Otherwise it might be possible to allocate tasks to threads manually using ThreadPools but that means I can’t use all of Julia’s nice multithreading capabilities. Any advice would be gratefully received.
In case it’s of any use, here’s a toy version which very roughly illustrates the current code structure:
using GLMakie
using ThreadPools
using FFTW
function update(chan, nx, ny)
println("update, thread id: ", Threads.threadid())
while isopen(chan)
# in the actual application data comes from a UDP socket using a non-allocating recv_into!
# data rate is about 300 Mbps
x = rand(nx,ny) .- 0.5
put!(chan, x)
sleep(0.3)
end
println("update finishing, thread id: ", Threads.threadid())
end
function process(chan, nx, ny)
println("process, thread id: ", Threads.threadid())
yield()
# GLMakie.set_window_config!(framerate=5.0)
nk = div(nx, 2) + 1
hm_node = GLMakie.Observable(zeros(nk,ny))
fig = GLMakie.Figure(fontsize=30)
ax1 = fig[1,1] = GLMakie.Axis(fig)
hm = GLMakie.image!(ax1, hm_node, colormap=:viridis, interpolate=false)
GLMakie.display(fig)
yield()
task = ThreadPools.@tspawnat 3 update(chan, nx, ny)
while isopen(chan)
x = take!(chan)
# Do some processing on x. Would like to make it multithreaded but not interfere with the update thread.
# For illustration, just do an FFT (it's non-allocating in the real code).
y = rfft(x)
hm_node[] = abs2.(y)
end
println("process finishing, thread id: ", Threads.threadid())
end
GLMakie.closeall()
nbuffer = 15
ch = Channel{Matrix{Float64}}(nbuffer)
task = ThreadPools.@tspawnat 2 process(ch, 200, 100)
# to stop the task, use close(ch)