Best way for a task to read from two channels

You could have one task per channel that always takes on its channel, when it gets something, it notifies a condition. The main task is only waiting on the condition and when the condition is notified, the main task checks both channels for messages using isready.

Yes, that’s what I was thinking of. If in doubt, you can always guard access with a lock. There’s also a different package with support for the upcoming atomics in 1.7, but it’s name escapes me right now (pinging @tkf, I think it’s yours).

Reading multiple channels (“nicely”) requires something like Go’s select. This has to be baked into the channel design and you can’t really retrofit it. You can somewhat simulate this in Julia using multiple tasks

function merged_channel!(channels)
    Channel() do merged
        @sync for ch in channels
            Threads.@spawn try
                for x in ch
                    put!(merged, x)
                end
            catch
                # Close input channels instead of the output so that
                # `CompositeException` will be thrown by `take!(merged)`.
                foreach(close, channels)
                rethrow()
            end
        end
    end
end

As you can see, since each task does for x in ch which does take!(ch) internally, the items in ch will be consumed by the task. This is mostly fine but you can lose some items if merged_channel! is closed on the reader end.

This is also inefficient since you need many tasks. So, I’d normally suggest using a single channel in the first place for this type of architecture if possible.

FYI, I implemented select-able channels in GitHub - tkf/Julio.jl: Structured Concurrency for Julia but it’s likely not very efficient yet.

4 Likes

I see.

So I think the easiest solution here is to merge the information flowing in the two channels into a single structure, like a tuple, and send that down a single channel. The tuple would be like (video_frame, fire_detected). The task sending the video frames would send a video frame and an empty fire_detected value, whereas the task sending the notifications would send the fire_detected notification and an empty video_frame. I think that will do.

@tkf, thank you once again for your help!

I’m not sure I understand.

You’re saying I could have task A reading channel ch1 (the one with the video frames) and another task, B, reading channel ch2 (the one with the fire notifications). Both of these tasks, when getting something in their input channel, send a notification on a new channel, ch3, to a new task, C. When task C is notified it then checks channels ch1 and ch2 using isready(). Is that what you’re saying?

If you want very fast concurrent update of circular buffer, you can try a nonblocking algorithm. Maybe something like this (not tested):

mutable struct NonBlockingCircularBuffer{T}
    buffer::AtomicVector{T}
    @atomic timestamp_writing::Int  # increment-only counter
    @atomic timestamp_written::Int  # increment-only counter
    # TODO: also add pads between the fields
end

function writeat!(xs::NonBlockingCircularBuffer{T}, input::AbstractVector{T}) where {T}
    (; buffer) = xs
    timestamp_written = @atomic xs.timestamp_written
    timestamp_writing = timestamp_written + length(input) - 1
    @atomic xs.timestamp_writing = timestamp_writing
    for (i, t) in zip(eachindex(input), timestamp_written:timestamp_writing)
        j = mod1(i, length(buffer))  # TODO: don't use mod
        atomic_setindex!(buffer, xs[i], j)  # relaxed write is OK
    end
    @atomic xs.timestamp_written = timestamp_writing
end

function readat!(output::AbstractVector{T}, xs::NonBlockingCircularBuffer{T}, at::Int) where {T}
    (; buffer) = xs
    timestamp_written = @atomic xs.timestamp_written
    timestamp_writing = @atomic xs.timestamp_writing
    start = max(at, timestamp_writing - length(buffer))  # optimization
    stop = min(at + length(output) - 1, timestamp_written)
    for (i, t) in zip(eachindex(output), start:stop)
        j = mod1(i, length(buffer))  # TODO: don't use mod
        buffer[i] = atomic_getindex(buffer, j)  # relaxed read is OK
    end
    # Now, retrospectively verify the portion of `output` that is valid:
    timestamp_writing = @atomic xs.timestamp_writing
    valid_start = max(start, timestamp_writing - length(buffer))
    read_timestamps = valid_start:stop
    first_valid = firstindex(output) + valid_start - start
    last_valid = first_valid + length(read_timestamps) - 1
    return (first_valid:last_valid, read_timestamps)
end
# TODO: use weaker ordering on timestamps (release and acquire should be fine?)

I think this should be safe as long as writeat! is done by a single task and readat! can read the buffer without holding a lock (i.e., non-blocking) from any task on any OS thread. The catch is that we don’t have AtomicVector{T}. But it’s possible to simulate this as long as T is pointer-free (“immutable”): Advice on using unsafe_wrap to view integers as atomic integers - #2 by tkf

Of course, if the reader request too old data, the output will be empty (isempty(first_valid:last_valid)) and implementing “back pressure” (which would occur naturally with channels) would be hard.

1 Like

Hi @tkf,

Thanks so much for this, but I’m afraid it’s a bit over my head.

I’ll try the approach I mentioned in a previous post, i.e., concatenating the information that would be flowing in the two channels into a tuple and then sending that through a single channel. Do you think this is OK?

This seems fine. It’s simple and easy…

The method with conditions would look something like this:

function buffer_frames_3()

	while true
        wait(notifycondition)
		if isready(ch1)
			item_1 = take!(ch1)
			...
		end
	
		if isready(ch2)
			item_2 = take!(ch2)
			...
		end
       end
end

The wait(notifycondition) makes it so it’s not spinning its wheels. The producer of frames/notifications just needs to notify(notifycondition) and then your consumer thread will check both channels.

1 Like

That seems a very good approach.

I’m not sure how to implement it though. Reading the terminal help on wait, notify and Condition makes me think I need Threads.Condition. Is that right?

help?> Threads.Condition
  Threads.Condition([lock])

  A thread-safe version of Base.Condition.

  To call wait or notify on a Threads.Condition, you must first call lock on it. When wait
  is called, the lock is atomically released during blocking, and will be reacquired before
  wait returns. Therefore idiomatic use of a Threads.Condition c looks like the following:

  lock(c)
  try
      while !thing_we_are_waiting_for
          wait(c)
      end
  finally
      unlock(c)
  end

How would that fit in the while loop in my buffer_frames_3() function? Also, how do I create the condition? Just by doing c = Threads.Condition()?

Thanks a lot!

Yes you just call Threads.Condition() to create it. Have the creator call lock on it… Then fire up the producer consumer threads, and have them know about the condition. The consumer calls wait, and the producer calls notify.

Note that isready is not thread-safe at the moment: Fix data races in `n_avail(::Channel)` to fix `isready`/`isempty` by c42f · Pull Request #41833 · JuliaLang/julia · GitHub

Even if it is fixed, isready is a racy API in the sense that code like

if isready(ch)
    take!(ch)
end

may block on take!(ch) because ch can be empty by the time this task tries take!(ch).

Also, note that the channel uses Threads.Condition internally. Using it outside too very likely is a good sign that something can be simplified.

At a high level, it sounds like it could work. But note that you’d need to copy anything you send through the channel if you are going to mutate it later and resend it. The reason why I mentioned the complex noblocking approach was that I thought copying a 30-second video all the time might not be ideal. Maybe you can represent the video as (say) Vector{Matirx{T}} so that the Mtarix (a frame) can be shared.

I see. Thanks for pointing that out, @tkf.

I totally agree. But I actually don’t need to send any video through the channel. What I have at the moment is a task (one task per camera) that receives one frame at a time in one channel and a detection notification in another channel. The task has an internal circular buffer which it updates with every new frame. When the task gets notified that a detection occurred, it then passes the frame buffer to another task (through a spawned function call, not a channel) which writes the frame buffer to a video file on disk.

The idea is to concatenate the frame that would go in one channel and the notification that would go in the other channel into a tuple that can be sent down a single channel now.

I’m not sure I understand. Are you saying individual elements of a vector can be shared across different tasks?

PS: It’s just a bit more complicated than I wrote above because the data flowing in the “video frame” channel is not just individual frames, but rather the tuple (video_frame, source_camera, time_stamp).

I see, yes, that sounds like a good plan. But I think you’d need to merge these two channels into one channel since Julia does not have select. Maybe using something like Channel{Union{PushFrame,DetectionNotification}} where PushFrame and DetectionNotification are appropriate struct.

I think putting everything into one channel is a good idea. I also think you should have some kind of serial number on the frames, and when there’s a detection, you send the frame number and the detection info. Otherwise there’s really no way to know which frame the detection corresponds to.

Yeah, I think using serial numbers is a good idea.

Sure. I think I’m already doing that. I’m sending time stamps in both channels:

Video frames channel: (video_frame, source_camera, time_stamp)

Detection channel: (detection_status, source_camera, time_stamp)

Right now I’m merging these two tuples into a single one and sending it along the merged channel:

((video_frame, source_camera, time_stamp), (detection_status, source_camera, time_stamp))

The time stamps accompanying each frame are acting as serial numbers.

Now, that’s really interesting, @tkf.

Maybe I could use multiple dispatch to define two inner methods in the buffer_frames() function to process the data taken from the merged channel?

function buffer_frames()

	function do_work(data::PushFrame)
	
		# Add 'data' (a video frame) to the circular buffer
		
		...
	
	end

	function do_work(data::DetectionNotification)
	
		# Use the frame buffer to create a video on disk
	
	
	end

	for data in incoming_channel

		do_work(data)

	end

end

Yes, multiple dispatch works pretty well for this. You can also do simple if _ isa _ dispatch:

for data in incoming_channel
    if data isa PushFrame
        ...
    else
        @assert data isa DetectionNotification
        ...
    end
end

Both approaches are supported by the compiler very well.

I thought of using if isa ... as well, but I thought that was considered bad programming practice. It certainly reminds me from my old Python days…

Hi @tkf,

Just out of curiosity: could FLoops.jl help me with any of this or is it meant specifically for data parallelism problems?

Thanks a lot for all your help with this.