I am trying to do some complicated asynchronous and preferably multiheaded operations on a Raspberry Pi. I have some ideas, but could really use some pointers on how to tackle problem. I will outline the approached I have tried so far, and describe problems with them.
Relying on Threads.@spawn
The first approach was to use a pattern along the lines of
julia> main_event = Threads.Event();
julia> n_elements = 1000;
julia> raw_data_channel = Channel(n_elements);
julia> processed_data_channel = Channel(n_elements);
julia> # dummy_function to make the code run
function collect_data()
t0 = time_ns()
while time_ns()-t0 < 1/25_000 # wait one sampling period
nothing
end
return rand()
end
collect_data (generic function with 1 method)
julia> data_producer_task = Threads.@spawn while true
wait(main_event)
raw_data = collect_data()
put!(raw_data_channel, raw_data)
# This first task runs continously at 25 kHz
end
Task (runnable) @0x00007fce6a23b080
julia> my_processing(x) = 2x;
julia> data_processor_task = Threads.@spawn while true
wait(main_event)
while !isempty(raw_data_channel)
result = my_processing(take!(raw_data_channel))
put!(processed_data_channel, result)
end
end
Task (runnable) @0x00007fce6a1d2400
I can now see that I have no items waiting, notify the main event, wait a bit, and see that I have a bunch of items waiting:
julia> getproperty.((raw_data_channel, processed_data_channel), :n_avail_items)
(0, 0)
julia> notify(main_event); sleep(0.05); reset(main_event)
julia> getproperty.((raw_data_channel, processed_data_channel), :n_avail_items)
(1001, 1001)
So sucessfull indefinite tasks, running on multiple threads, with a graceful pause/play button. Great!
However, I happened to be using julia 1.10. When I try on julia 1.9, with the more complicated actual pipeline with 3 different tasks, the call to reset(main_event)
kept running forever. It could be relevant that I wrap the tasks in try
statements, and that I called errormonitor
on all tasks. But 1.9 or 1.10 is all the same, until I am able to compile julia from source on a Raspberry Pi (I am currently running into this error, and trying this alternative approach). That is because reset(::Base.Event)
was introduced in julia, meaning that I can not do this with the prebuildt binaries for 32-bit (ARMv7-a hard float). Also, I would prefer not having to run a beta-version (1.10) to get the full application going, as it is intended for a production system.
Using the Distributed module
I just started looking into this approach, and I am unsure how to get multithreaded functionality. Do tasks created by @spawnat
automatically run on different threads? Also, do not know how I would play/pause these processes. Is it even possible? Would I spawn a seperate task for each infinite while true
loop? Do I have to use RemoteChannel
s? I tried playing around a bit with this approach, but I do not think that it will be helpfull if I go on about my confusion.
Final remark
It could be that I have to give up on play/pause functionality. I would still be able to do what I need to in the final application, but it would involve many more restarts to the julia process during both development and usage, so it something that would be nice to avoid. Any help on the two approaches, or an alternative approach, would be very appreciated.