Help find best approach to streaming data on raspberry pi

I am trying to do some complicated asynchronous and preferably multiheaded operations on a Raspberry Pi. I have some ideas, but could really use some pointers on how to tackle problem. I will outline the approached I have tried so far, and describe problems with them.

Relying on Threads.@spawn

The first approach was to use a pattern along the lines of

julia> main_event = Threads.Event();

julia> n_elements = 1000;

julia> raw_data_channel = Channel(n_elements);

julia> processed_data_channel = Channel(n_elements);

julia> # dummy_function to make the code run
       function collect_data()
           t0 = time_ns()
           while time_ns()-t0 < 1/25_000  # wait one sampling period
               nothing
           end
           return rand()
       end
collect_data (generic function with 1 method)

julia> data_producer_task = Threads.@spawn while true
           wait(main_event)
           raw_data = collect_data()
           put!(raw_data_channel, raw_data)
           # This first task runs continously at 25 kHz
       end
Task (runnable) @0x00007fce6a23b080

julia> my_processing(x) = 2x;

julia> data_processor_task = Threads.@spawn while true
           wait(main_event)
           while !isempty(raw_data_channel)
               result = my_processing(take!(raw_data_channel))
               put!(processed_data_channel, result)
           end
       end
Task (runnable) @0x00007fce6a1d2400

I can now see that I have no items waiting, notify the main event, wait a bit, and see that I have a bunch of items waiting:

julia> getproperty.((raw_data_channel, processed_data_channel), :n_avail_items)
(0, 0)

julia> notify(main_event); sleep(0.05); reset(main_event)

julia> getproperty.((raw_data_channel, processed_data_channel), :n_avail_items)
(1001, 1001)

So sucessfull indefinite tasks, running on multiple threads, with a graceful pause/play button. Great!

However, I happened to be using julia 1.10. When I try on julia 1.9, with the more complicated actual pipeline with 3 different tasks, the call to reset(main_event) kept running forever. It could be relevant that I wrap the tasks in try statements, and that I called errormonitor on all tasks. But 1.9 or 1.10 is all the same, until I am able to compile julia from source on a Raspberry Pi (I am currently running into this error, and trying this alternative approach). That is because reset(::Base.Event) was introduced in julia, meaning that I can not do this with the prebuildt binaries for 32-bit (ARMv7-a hard float). Also, I would prefer not having to run a beta-version (1.10) to get the full application going, as it is intended for a production system.

Using the Distributed module

I just started looking into this approach, and I am unsure how to get multithreaded functionality. Do tasks created by @spawnat automatically run on different threads? Also, do not know how I would play/pause these processes. Is it even possible? Would I spawn a seperate task for each infinite while true loop? Do I have to use RemoteChannels? I tried playing around a bit with this approach, but I do not think that it will be helpfull if I go on about my confusion.

Final remark

It could be that I have to give up on play/pause functionality. I would still be able to do what I need to in the final application, but it would involve many more restarts to the julia process during both development and usage, so it something that would be nice to avoid. Any help on the two approaches, or an alternative approach, would be very appreciated.

A third approach - global mutable variables

The following snippet might also prove interesting. EDIT: Below is a more sophisticated version of the originally proposed code, but the concept is the same.

using Dates

toggle!(x::Ref{Bool}) = (x[] = !x[]; x[])  # Tiny convenience function
should_run = Ref(true)  # The "lever" we can pull to exit the loop

my_task = Threads.@spawn while true
        sleep(0.5)
        println(now())
        should_run[] || (println("Breaking loop"); break)
end

# Will run until should_run[] returns false
toggle!(should_run)

Is there any reason why this would be a bad approach?

Have you looked into Dagger.jl at all? In terms of automatic Distributed + multithreading, that’s baked in to Dagger.@spawn - it’ll use any thread on any worker to run your task.

I’m not sure what you mean by “play/pause”, but if you can elaborate, maybe it’s something I can provide a solution for?

Additionally, in terms of streaming support, I have a branch that implements streaming support, which should automatically work across Distributed processes and multiple threads. It needs more work to be merge-ready, but the idea is solid and will probably get merged in the next 1-2 months. If this seems appealing to you, let me know and I can prioritize finishing it :smile:

For one thing, this isn’t thread-safe. For another, it doesn’t work across multiple processes, since bla only exists on one process. Dagger’s streaming branch support early termination of streaming tasks from within the task or from the outside, so maybe it’s worth taking a look to see if this would work for you.

1 Like

By play/pause, I mean that I can start the loop, and have it run until I explicitly tell it to stop (by reset(main_event) or bla["stop"]=true. This should happen gracefully, i.e. without trowing errors, and also without having to restart the julia process.

The intended effect is for one thread to stop a process on another. If that means not thread-safe, then thread-safe is not what I want. I think. I am still a little fuzzy on the details of thread safety. But the effect should be very similar to a second case where one thread runs a while true loop and calls wait(main_event), and another thread calls reset(main_event), right? In both cases, the excecution stops? (In the first case with bla["stop"] && break the process finnishes, and in the latter it is paused and can be resumed, but that detail is not of great importance)

I have not looked into dagger yet. But it seems like it would act as a replacement for the scheduling functions provided by Distributed.jl and the Threads module, with very similar effects - is that correct? For my use-case, do you feel like it is an advantage going to distributed computing? Or is it overkill, and perhaps unnecessary overhead? I will only be running on a single CPU on a single computer.

The branch you mention sounds like exactly what I need. I have not seen mention of functionality aimed at acquiring and working with streaming data, so the sound of functionality directed at that sounds really great. I will definitively take a look, and see if it is something I would be able to understand and use :slight_smile:

EDIT: I looked at the PRs you made to that branch, but I am having a hard time seeing what basic usage looks like. If you could add a docstring to what you consider the main entry point for streaming, ideally with some examples, that could perhas be sufficient “documentation” to get me started. Thanks for your work!

It’s fine for threads to communicate, but they need to do so in a manner that doesn’t introduce races - Dict is not thread-safe on its own, so what you’re currently doing could end up producing a segfault or worse. You can just create a Threads.ReentrantLock() to protect it, and just do:

blah_lock = Threads.ReentrantLock()

stop = lock(blah_lock) do
     blah["stop"]
end

# OR, for set:

lock(blah_lock) do
    blah["stop"] = true
end

, which is thread-safe because the not-thread-safe object (Dict) is protected by a lock, which “serializes” the access (forces only one lock call to execute its inner function at a time).

This generally sounds fine - a Threads.Event can at least be correctly set-and-reset concurrently on Julia 1.8+, I believe.

Yes, it basically is a superset of Distributed.@spawn and Threads.@spawn, with many, many more features built-in to make parallel programming easy.

I think it’s better to ask whether the small amount of overhead that Dagger introduces is less important than the gain in productivity that you get from using it - if it keeps you from having to roll your own distributed-and-threaded streaming runtime, data conversion and transfer infrastructure, etc. then it may greatly outweigh a little bit of overhead (and if you find the overhead is significant, just file an issue and I can work on bringing it down).

1 Like

Sorry, yeah, I haven’t yet gotten around to documenting it.

The general idea with streaming is to make it more efficient to run the same graph of Dagger tasks continuously, avoiding the overhead of calling Dagger.@spawn over-and-over (which can dominate the runtime cost of small tasks). For example, if I wanted to print random Float64 values continuously, I could do something like:

Dagger.spawn_streaming() do # enters a streaming region
  vals = Dagger.@spawn rand()
  Dagger.@spawn println(vals)
end # exits the streaming region, and starts the DAG running

This will continuously re-evaluate rand(), and send each result into println(), in order. (This will run endlessly, but you can stop it by calling Dagger.kill!(vals) or call it on any other part of the streaming DAG). Of course, more complicated DAGs are supported as well. With the above example, you could modify it to write results to an output file as well:

Dagger.spawn_streaming() do
  vals = Dagger.@spawn rand()
  Dagger.spawn(vals) do vals
    open("results.txt", "w") do io
      println(io, repr(vals))
    end
  end
  Dagger.@spawn println(vals)
end

And all tasks will then run asynchronously, with writing and printing happening concurrently (possibly on different workers and threads).

1 Like

Your streaming examples look great! The syntax feels intuitive, it looks rather straightforward. Also, thanks a lot for the demo of how to use locks. I am rather new to this.

So as long as I am only running on a single machine, the following code should be quite identical you first example:

my_toggle = Ref{Bool}(true)
my_toggle_lock = Threads.ReentrantLock()

read_my_toggle() = lock(my_toggle_lock) do
    my_toggle[]
end

toggle_my_toggle!(target_state=!read_my_toggle()) = lock(my_toggle_lock) do
    my_toggle[] = target_state
end

Threads.@spawn while read_my_toggle()  # run while my_toggle is holding `true`
  val = rand()
  println(vals)
end

toggle_my_toggle!() # As the toggle was on, this would turn it off

The known difference between the solutions is that this one has one less dependency and is less experimental, but requires more boilerplate. Also, the Dagger.spwawn_streaming() solution is expected to have some overhead, whereas this solution has minimal overhead. Is that a fair assesment?

Sidenote - do you know if I have one computer and two raspberry pi’s running Julia, connected by ethernet cables, could I be controlling from the computer, spawning workers and retrieving results, that actually run on the pi’s? All using Dagger.jl? I have tried something like this with RemoteREPL.jl, but there was a 50ms overhead with each call from the standard library Sockets, so that solution has a major pain-point.

Thanks! :heart:

Yes, this seems fine under multithreaded execution.

Fair enough, but note that you are basically required to ensure that your approach remains correct under changes to your code, and the moment you want to go distributed, you have a lot more work to do, that I’ve already done for you (and I will continue to test and maintain) :smile:

If the overhead is actually measured to be significant, just let me know and I can probably bring it down. Just wondering, what are you doing that requires such high streaming performance?

Have you tried using Dagger or even just Distributed in that setup? That’s exactly the use case that they’re best at.

Seems like you found a solution for this :smile:

I have recently found out that I may in fact want to go distributed, so Dagger seems like my best bet ATM. I will start some testing in the coming weeks, as we have some other aspect that is more urgent.

Trying to measure 8 seperate 25 kHz signals, perform significant signal processing, and send/save the data. I am not sure how close we are to the limit of doableness, but I want have as large margins as possible ^_^. But that sounds great, I will open an issue if I encounter significant overhead!

Yhea, the RemoteREPL overhead is about 0.6 ms when we disabled nagles algorithm :smiley: that feels like my first significant improvement to a package, non-docs related.

I am stuck on 1.6, as I am running on the raspberry Pi. The compat for Dagges.jl on that branch places a lower limit at 1.8, meaning that I am unable to test the streaming branch :confused:

Sounds great, let me know how I can help! (Aside from finishing the streaming PR :laughing: )

Funny enough, I originally wrote the streaming PR to support collecting and processing SDR data for an in-progress radio/RADAR astronomy project at MIT. I’m using it with NATS and Pluto to implementt an interactive, remotely-programmable streaming engine so that researchers can easily define and run experiments on real-time signal data (and for this project, we’re talking possibly 50 GSPS of radio data, so high-performance is my end-goal).

Maybe we can follow up in this issue to see if Fingolfin came up with any solution or has any other ideas for how to get ARMv7 builds working again? I don’t have an RPi locally with me right now, but feel free to PM me if you want help looking at getting it running over Zoom or something.

That is very close to what I have been up to! This is a use-case that is extremely interesting for interactive work with data-collection and processing, I love to hear that others are also exploring this direction. With my project’s latest move from a single RPi to a computer and two RPi’s, I might move to a VSCode interface, just because I find the startup-time and reactivity of Pluto to make it harder to do development in.

I am able to install Dagger 0.15.1 on the LTS version of Julia (1.6.7). I will not get streaming functionality, but from skimming the release notes, it seems like Dagger should still be useful :smiley: Do you know of pitfalls in Dagger 0.15.1 that I should be aware of?

You are talking about getting later versions of Julia running on RPi. Immediatly I am thinking that it would be easier to support older version of Julia on Dagger’s master branch. Is that possible at all? To extend Dagger.jl’s Julia-support to the LTS?

But it is starting to feel like this would be a very nice case-study for dagger. Could it be relevant to add an in-depth tutorial on how to stream data from a raspberry pi, as an insightful use-case, in the documentation? If yes, we can create an issue and discuss further over there.

Specifically Dagger on the RPi, or later julia version on the RPi? I am very interested in both, so if I get stuck somewhere I would be very interested in some sort of call to work it out!

There’s a lot of them, especially w.r.t distributed computing. I would highly recommend focusing on getting Julia >1.6 working on the RPi instead of trying to work with a very old version of Dagger.

Maybe true from a technical standpoint, but I’m definitely not going to be adding back Julia 1.6 support to Dagger master. There are just too many Distributed bugs in older Julia, and without good atomics support (which only came in Julia 1.7), a lot of internal Dagger mechanisms become either cumbersome or impossible to implement. Additionally, ScopedValues support relies on Julia 1.8, and I have no power to change that. Also, note that 1.6 is not going to be the LTS for much longer - it sounds like 1.11 is going to end up being the next LTS, and it’s right around the corner, with all the features necessary to support Dagger.

Let’s focus on getting the RPi working with 1.7+, I’m sure many other people would appreciate that :slightly_smiling_face:

Definitely! I have a “Use Cases” section in the documentation that is begging to be expanded upon, and I can add longer blog posts to https://daggerjl.ai/

First the latter, and then the former if you’re still interested :slightly_smiling_face:

1 Like

I’ve been quietly reading this conversation. Not expert in this domain but I love Raspberry Pi’s and have one set-up at home. If you need any tests run on a Pi, let me know! Happy to contribute another data point. :smile:

3 Likes

I agree, there are a lot of upgrades in newer Julia versions, and it would be a waste to use developer time on backwards compatibility instead of improvements.

Unfortunately, I want to show a demo in a week, and need to present something in about three weeks. So likely before 1.10 is even released, let alone 1.12, which would be required for a new LTS of 1.11. I think I will have to try to make it work with 1.6, Distributed, and Theads.

This also means that I can not really jusitfy spending time trying to compile later Julia versions for the Pi. But thank you a lot for your kind offer and helpfullness. It is much appreciated!

1 Like

It looks like ARMv7 will be fixed somewhat soon, as Julia now has an ARMv7 build farm: Fixes for armv7l buildbots by staticfloat · Pull Request #51945 · JuliaLang/julia · GitHub

1 Like

Oh wow, that is fantastic! Thanks for the link <3

1 Like