Best way for a task to read from two channels

Hi,

I’m working on a problem that involves tasks communicating through channels.

I have a task that needs to read from two channels and I’m trying to figure out the best way of doing that.

The task needs to read an incoming sequence of video frames and buffer the last 30 seconds worth of frames in an internal array. These frames are read from channel ch1.

At the same time, there is another incoming channel (ch2) which is used to notify the task that an event of interest has been detected (the detection is performed by another task, which sends notifications through the channel ch2).

When a notification comes along in channel ch2, the task has to use the buffered frames to create a video file. My question is: what’s the best way of implementing this? Is it even a good idea to have a task read from more than one channel?

One important point is that the update rate in channels ch1 and ch2 are, most of the time, quite different. Channel ch1 is fed regularly with frames at a rate of frate frames per second (typically 2 or 3 fps). In turn, channel ch2 can go hours or days with no activity at all. However, it could also happen that a detection happens for every frame, in which case channel ch2 would be updated at the same rate as channel ch1.

The first thing I could try is this:

using Base.Threads: @spawn

ch1 = Channel(32)
ch2 = Channel(32)

function buffer_frames_1()

	for item_1 in ch1
	
		...
		
		for item_2 in ch2
		
			...
		
		end
		
	end

end

This doesn’t work though, since once we get to the inner loop, we get stuck in there and never get a chance to read values from channel ch1 in the outer loop again.

Another possibility is:

function buffer_frames_2()

	while true
	
		item_1 = take!(ch1)

		item_2 = take!(ch2)
		
		...

	end

end

This is better, but it doesn’t really work either because, since take!() blocks when the channel is empty, the two channels are read at the rate of the slower channel. Besides, I think this means the higher rate channel will fill up.

A third approach involves using isready() (which is non-blocking) on the channels before calling take!() (which is blocking):

function buffer_frames_3()

	while true
	
		if isready(ch1)
			item_1 = take!(ch1)
			...
		end
	
		if isready(ch2)
			item_2 = take!(ch2)
			...
		end

	end

end

This works in the sense that the channels are read at the proper rate, but it results in a high CPU usage, probably because the while loop is executed zillions of times a second. A possible solution would be to insert a sleep() call in this loop so that it could be executed only slightly faster than the faster channel (maybe 2*frate times per second) but at this point I’m not sure I’m going in the right direction anymore.

One more solution I thought of was to use just one channel and use a flag to signal that a detection happened. In this case, the task sending the video frames would insert flag=0 in the channel. In turn, the task sending the detection notifications would send empty frames and insert flag=1 in the channel when a detection happened. This would avoid the problem of having to read from two channels.

I wonder if there are better, more elegant solutions to this problem. Any help is greatly appreciated.

1 Like

It sounds like what you need is not necessarily two channels, but rather a circular buffer and a channel. The circular buffer would hold 30s worth of frames (so fps * 30 frames) together with a frame index (so a tuple of an index and a frame). The buffer would be continously updated by your “provider” of frames.

Your other task that’s doing the detecting can then send just the frame index through the channel and you can dump/copy/process the last 30s worth of data looking back through the circular buffer. This would also allow the “processing” task to sleep most of the time, since it would wait on a message from the channel to continue operating (assuming the channel blocks properly).

When I said the task would buffer the last 30 seconds worth of frames in an “internal array” I was actually thinking of a circular buffer (I was thinking of doing that manually using an array but I understand that packages such as DataStructures.jl provide such kinds of data structures).

But in my case the “provider” of frames is another task, which is providing the frames through channel ch1. Is there some other way for the buffer to be updated without going through a channel? Maybe you’re thinking of sharing the buffer across tasks? I’m trying to avoid sharing data across tasks because in practice each of these tasks will my spawned multiple times.

Here we are on the same page :slight_smile:

Thanks!

After reading about circular buffers in the DataStructures.jl package, I guess what you mean is that the circular buffer would be shared across the producer and the consumer of frames, with the producer pushing frames into the buffer and the consumer popping them out.

However, I’ll have multiple producers, and multiple consumers, running in different tasks. I wonder if in that case (multiple tasks pushing into and popping out of a single buffer) I would still be able to use circular buffers.

1 Like

I guess the question here is: are circular buffers thread-safe? Can I have multiple tasks writing to and reading from them without worrying about data races?

You could have one task per channel that always takes on its channel, when it gets something, it notifies a condition. The main task is only waiting on the condition and when the condition is notified, the main task checks both channels for messages using isready.

Yes, that’s what I was thinking of. If in doubt, you can always guard access with a lock. There’s also a different package with support for the upcoming atomics in 1.7, but it’s name escapes me right now (pinging @tkf, I think it’s yours).

Reading multiple channels (“nicely”) requires something like Go’s select. This has to be baked into the channel design and you can’t really retrofit it. You can somewhat simulate this in Julia using multiple tasks

function merged_channel!(channels)
    Channel() do merged
        @sync for ch in channels
            Threads.@spawn try
                for x in ch
                    put!(merged, x)
                end
            catch
                # Close input channels instead of the output so that
                # `CompositeException` will be thrown by `take!(merged)`.
                foreach(close, channels)
                rethrow()
            end
        end
    end
end

As you can see, since each task does for x in ch which does take!(ch) internally, the items in ch will be consumed by the task. This is mostly fine but you can lose some items if merged_channel! is closed on the reader end.

This is also inefficient since you need many tasks. So, I’d normally suggest using a single channel in the first place for this type of architecture if possible.

FYI, I implemented select-able channels in GitHub - tkf/Julio.jl: Structured Concurrency for Julia but it’s likely not very efficient yet.

3 Likes

I see.

So I think the easiest solution here is to merge the information flowing in the two channels into a single structure, like a tuple, and send that down a single channel. The tuple would be like (video_frame, fire_detected). The task sending the video frames would send a video frame and an empty fire_detected value, whereas the task sending the notifications would send the fire_detected notification and an empty video_frame. I think that will do.

@tkf, thank you once again for your help!

I’m not sure I understand.

You’re saying I could have task A reading channel ch1 (the one with the video frames) and another task, B, reading channel ch2 (the one with the fire notifications). Both of these tasks, when getting something in their input channel, send a notification on a new channel, ch3, to a new task, C. When task C is notified it then checks channels ch1 and ch2 using isready(). Is that what you’re saying?

If you want very fast concurrent update of circular buffer, you can try a nonblocking algorithm. Maybe something like this (not tested):

mutable struct NonBlockingCircularBuffer{T}
    buffer::AtomicVector{T}
    @atomic timestamp_writing::Int  # increment-only counter
    @atomic timestamp_written::Int  # increment-only counter
    # TODO: also add pads between the fields
end

function writeat!(xs::NonBlockingCircularBuffer{T}, input::AbstractVector{T}) where {T}
    (; buffer) = xs
    timestamp_written = @atomic xs.timestamp_written
    timestamp_writing = timestamp_written + length(input) - 1
    @atomic xs.timestamp_writing = timestamp_writing
    for (i, t) in zip(eachindex(input), timestamp_written:timestamp_writing)
        j = mod1(i, length(buffer))  # TODO: don't use mod
        atomic_setindex!(buffer, xs[i], j)  # relaxed write is OK
    end
    @atomic xs.timestamp_written = timestamp_writing
end

function readat!(output::AbstractVector{T}, xs::NonBlockingCircularBuffer{T}, at::Int) where {T}
    (; buffer) = xs
    timestamp_written = @atomic xs.timestamp_written
    timestamp_writing = @atomic xs.timestamp_writing
    start = max(at, timestamp_writing - length(buffer))  # optimization
    stop = min(at + length(output) - 1, timestamp_written)
    for (i, t) in zip(eachindex(output), start:stop)
        j = mod1(i, length(buffer))  # TODO: don't use mod
        buffer[i] = atomic_getindex(buffer, j)  # relaxed read is OK
    end
    # Now, retrospectively verify the portion of `output` that is valid:
    timestamp_writing = @atomic xs.timestamp_writing
    valid_start = max(start, timestamp_writing - length(buffer))
    read_timestamps = valid_start:stop
    first_valid = firstindex(output) + valid_start - start
    last_valid = first_valid + length(read_timestamps) - 1
    return (first_valid:last_valid, read_timestamps)
end
# TODO: use weaker ordering on timestamps (release and acquire should be fine?)

I think this should be safe as long as writeat! is done by a single task and readat! can read the buffer without holding a lock (i.e., non-blocking) from any task on any OS thread. The catch is that we don’t have AtomicVector{T}. But it’s possible to simulate this as long as T is pointer-free (“immutable”): Advice on using unsafe_wrap to view integers as atomic integers - #2 by tkf

Of course, if the reader request too old data, the output will be empty (isempty(first_valid:last_valid)) and implementing “back pressure” (which would occur naturally with channels) would be hard.

1 Like

Hi @tkf,

Thanks so much for this, but I’m afraid it’s a bit over my head.

I’ll try the approach I mentioned in a previous post, i.e., concatenating the information that would be flowing in the two channels into a tuple and then sending that through a single channel. Do you think this is OK?

This seems fine. It’s simple and easy…

The method with conditions would look something like this:

function buffer_frames_3()

	while true
        wait(notifycondition)
		if isready(ch1)
			item_1 = take!(ch1)
			...
		end
	
		if isready(ch2)
			item_2 = take!(ch2)
			...
		end
       end
end

The wait(notifycondition) makes it so it’s not spinning its wheels. The producer of frames/notifications just needs to notify(notifycondition) and then your consumer thread will check both channels.

1 Like

That seems a very good approach.

I’m not sure how to implement it though. Reading the terminal help on wait, notify and Condition makes me think I need Threads.Condition. Is that right?

help?> Threads.Condition
  Threads.Condition([lock])

  A thread-safe version of Base.Condition.

  To call wait or notify on a Threads.Condition, you must first call lock on it. When wait
  is called, the lock is atomically released during blocking, and will be reacquired before
  wait returns. Therefore idiomatic use of a Threads.Condition c looks like the following:

  lock(c)
  try
      while !thing_we_are_waiting_for
          wait(c)
      end
  finally
      unlock(c)
  end

How would that fit in the while loop in my buffer_frames_3() function? Also, how do I create the condition? Just by doing c = Threads.Condition()?

Thanks a lot!

Yes you just call Threads.Condition() to create it. Have the creator call lock on it… Then fire up the producer consumer threads, and have them know about the condition. The consumer calls wait, and the producer calls notify.

Note that isready is not thread-safe at the moment: Fix data races in `n_avail(::Channel)` to fix `isready`/`isempty` by c42f · Pull Request #41833 · JuliaLang/julia · GitHub

Even if it is fixed, isready is a racy API in the sense that code like

if isready(ch)
    take!(ch)
end

may block on take!(ch) because ch can be empty by the time this task tries take!(ch).

Also, note that the channel uses Threads.Condition internally. Using it outside too very likely is a good sign that something can be simplified.

At a high level, it sounds like it could work. But note that you’d need to copy anything you send through the channel if you are going to mutate it later and resend it. The reason why I mentioned the complex noblocking approach was that I thought copying a 30-second video all the time might not be ideal. Maybe you can represent the video as (say) Vector{Matirx{T}} so that the Mtarix (a frame) can be shared.

I see. Thanks for pointing that out, @tkf.

I totally agree. But I actually don’t need to send any video through the channel. What I have at the moment is a task (one task per camera) that receives one frame at a time in one channel and a detection notification in another channel. The task has an internal circular buffer which it updates with every new frame. When the task gets notified that a detection occurred, it then passes the frame buffer to another task (through a spawned function call, not a channel) which writes the frame buffer to a video file on disk.

The idea is to concatenate the frame that would go in one channel and the notification that would go in the other channel into a tuple that can be sent down a single channel now.

I’m not sure I understand. Are you saying individual elements of a vector can be shared across different tasks?

PS: It’s just a bit more complicated than I wrote above because the data flowing in the “video frame” channel is not just individual frames, but rather the tuple (video_frame, source_camera, time_stamp).

I see, yes, that sounds like a good plan. But I think you’d need to merge these two channels into one channel since Julia does not have select. Maybe using something like Channel{Union{PushFrame,DetectionNotification}} where PushFrame and DetectionNotification are appropriate struct.

I think putting everything into one channel is a good idea. I also think you should have some kind of serial number on the frames, and when there’s a detection, you send the frame number and the detection info. Otherwise there’s really no way to know which frame the detection corresponds to.

Yeah, I think using serial numbers is a good idea.