Best way for a task to read from two channels

If you want very fast concurrent update of circular buffer, you can try a nonblocking algorithm. Maybe something like this (not tested):

mutable struct NonBlockingCircularBuffer{T}
    buffer::AtomicVector{T}
    @atomic timestamp_writing::Int  # increment-only counter
    @atomic timestamp_written::Int  # increment-only counter
    # TODO: also add pads between the fields
end

function writeat!(xs::NonBlockingCircularBuffer{T}, input::AbstractVector{T}) where {T}
    (; buffer) = xs
    timestamp_written = @atomic xs.timestamp_written
    timestamp_writing = timestamp_written + length(input) - 1
    @atomic xs.timestamp_writing = timestamp_writing
    for (i, t) in zip(eachindex(input), timestamp_written:timestamp_writing)
        j = mod1(i, length(buffer))  # TODO: don't use mod
        atomic_setindex!(buffer, xs[i], j)  # relaxed write is OK
    end
    @atomic xs.timestamp_written = timestamp_writing
end

function readat!(output::AbstractVector{T}, xs::NonBlockingCircularBuffer{T}, at::Int) where {T}
    (; buffer) = xs
    timestamp_written = @atomic xs.timestamp_written
    timestamp_writing = @atomic xs.timestamp_writing
    start = max(at, timestamp_writing - length(buffer))  # optimization
    stop = min(at + length(output) - 1, timestamp_written)
    for (i, t) in zip(eachindex(output), start:stop)
        j = mod1(i, length(buffer))  # TODO: don't use mod
        buffer[i] = atomic_getindex(buffer, j)  # relaxed read is OK
    end
    # Now, retrospectively verify the portion of `output` that is valid:
    timestamp_writing = @atomic xs.timestamp_writing
    valid_start = max(start, timestamp_writing - length(buffer))
    read_timestamps = valid_start:stop
    first_valid = firstindex(output) + valid_start - start
    last_valid = first_valid + length(read_timestamps) - 1
    return (first_valid:last_valid, read_timestamps)
end
# TODO: use weaker ordering on timestamps (release and acquire should be fine?)

I think this should be safe as long as writeat! is done by a single task and readat! can read the buffer without holding a lock (i.e., non-blocking) from any task on any OS thread. The catch is that we don’t have AtomicVector{T}. But it’s possible to simulate this as long as T is pointer-free (“immutable”): Advice on using unsafe_wrap to view integers as atomic integers - #2 by tkf

Of course, if the reader request too old data, the output will be empty (isempty(first_valid:last_valid)) and implementing “back pressure” (which would occur naturally with channels) would be hard.

1 Like