Sensor processing in a robotics application

For a real-time control application, from the main thread, I use multiple dispatch to spin up some remote functions (methods, each appropriate to a physical sensor type) containing endless while loops. These function/methods ingest data from individual ports so that each port listens to 0MQ on one thread, which always processes the same data type. Each method gets the data from 0MQ, then calls a suitable unmarshal function (because I need to load up structs for use elsewhere), and writes the result into its local channel, which was originally created (per sensor type) as a RemoteChannel from main. I.e. a set of RemoteChannels – one per worker-sensortype-port.

It works but soon encounters a race condition, detectable in the returning futures.

I use remotecall to launch the functions I described above with and without fetch(). But fetch() is meaningless as these functions all return nothing – their output goes into a channel. So the functions once started of are of no interest to main, its only concern is what’s on each of the RemoteChannels on which it does a take!().

I presume channels can be used like this with asynchronous put!s and take!s going on. But I am beginning to think that 0MQ might be better than channels because channels are just references with some backing store (?), whereas 0MQ is a “proper” queue.

Is there any other way to do this? I have read this excellent blog on different types of concurrency and investigated both ThreadsX and ThreadPools. The Transducers.jl package looks amazing but a lot seems to be aimed at map-reduce or other optimizations. Perhaps, I should use Rocket.jl or Actors.jl but I am up against a deadline and although amazing they would require a brain refactor let alone a total code rewrite.

Can anyone advise?

Can you elaborate on this? What sort of race conditions do you encounter?

(Remote)Channels use locks to keep consistent, so they should be fine in that regard.

When I do a remotecall() I get:

julia> sensor_data_futures
3-element Vector{Any}:
 Future(3, 1, 312, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)
 Future(4, 1, 313, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)
 Future(4, 1, 314, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

It boils down to a misconception on my part, namely that I can start a process that exists “freely” and I can talk to it through a “mailbox” (like in Erlang), when in fact Julia seems to only support Tasks that eventually return with an “answer” of some sort.

Julia definitely also supports long running background tasks that don’t necessarily have to return something. It’s just that the API of remotecall is used for calling a single function that does some work on some worker process and then returns a value.

In general, thinking of (OS-)threads in julia is often a misnomer… The julia runtime spawns OS-threads at startup and distributes spawned tasks among them to execute. So e.g. the Threads.@threads macro does nothing more than split a loop into different tasks that are executed on all available worker threads locally. These are still in the same process however and still share ressources (regular multithreading via light coroutines/green threads).

The Distributed module is used for actually spawning remote worker processes on different (or sometimes the same…) machine. These don’t share ressources, have different runtimes and thus different tasks. That’s where RemoteChannel comes in, to facilitate interprocess communication. If all your sensors are accessible from the same physical machine, you really only need Threads and Channel.


Note that there’s also @async, which just spawns an async task on the (process-)local scheduler to execute (and the accompanying @sync to wait on all enclosed @async spawned tasks to finish execution). This doesn’t necessarily return a value and just returns a Task object immediately. I think this is probably what you’re looking for. As of (I think) 1.8 (possibly 1.7), these tasks can also migrate between OS-threads, depending on which worker thread is free to execute stuff.

1 Like

Already 1.7, yes: https://github.com/JuliaLang/julia/blob/8611a64e7efeb48d892d9d171580acfe3ef97caf/NEWS.md#multi-threading-changes

1 Like

Assuming the phrase “these tasks” refers to Tasks created by @async, this it not the case. They do not migrate across worker threads to support legacy Julia programs using @async.

1 Like

It’d be helpful to have a minimal example. For example, would something like the following work?

using Distributed

messages = remotecall_fetch(default_worker_pool()) do
    RemoteChannel() do
        # Process-local buffer if the remote program needs to make some forward
        # progress even when there are some network hiccups:
        buffersize = 10
        Channel{Tuple{Int,Int}}(buffersize; spawn = true) do messages
            # Run some remote program to produce values here:
            for x in 1:100
                put!(messages, (myid(), x))
            end
        end
    end
end

while true
    x = try
        take!(messages)
    catch
        isopen(messages) && rethrow()
        break
    end
    @show x
end
1 Like

Thank you so much for your replies! I have been busy restructuring my code as suggested.