If you want very fast concurrent update of circular buffer, you can try a nonblocking algorithm. Maybe something like this (not tested):
mutable struct NonBlockingCircularBuffer{T}
buffer::AtomicVector{T}
@atomic timestamp_writing::Int # increment-only counter
@atomic timestamp_written::Int # increment-only counter
# TODO: also add pads between the fields
end
function writeat!(xs::NonBlockingCircularBuffer{T}, input::AbstractVector{T}) where {T}
(; buffer) = xs
timestamp_written = @atomic xs.timestamp_written
timestamp_writing = timestamp_written + length(input) - 1
@atomic xs.timestamp_writing = timestamp_writing
for (i, t) in zip(eachindex(input), timestamp_written:timestamp_writing)
j = mod1(i, length(buffer)) # TODO: don't use mod
atomic_setindex!(buffer, xs[i], j) # relaxed write is OK
end
@atomic xs.timestamp_written = timestamp_writing
end
function readat!(output::AbstractVector{T}, xs::NonBlockingCircularBuffer{T}, at::Int) where {T}
(; buffer) = xs
timestamp_written = @atomic xs.timestamp_written
timestamp_writing = @atomic xs.timestamp_writing
start = max(at, timestamp_writing - length(buffer)) # optimization
stop = min(at + length(output) - 1, timestamp_written)
for (i, t) in zip(eachindex(output), start:stop)
j = mod1(i, length(buffer)) # TODO: don't use mod
buffer[i] = atomic_getindex(buffer, j) # relaxed read is OK
end
# Now, retrospectively verify the portion of `output` that is valid:
timestamp_writing = @atomic xs.timestamp_writing
valid_start = max(start, timestamp_writing - length(buffer))
read_timestamps = valid_start:stop
first_valid = firstindex(output) + valid_start - start
last_valid = first_valid + length(read_timestamps) - 1
return (first_valid:last_valid, read_timestamps)
end
# TODO: use weaker ordering on timestamps (release and acquire should be fine?)
I think this should be safe as long as writeat!
is done by a single task and readat!
can read the buffer without holding a lock (i.e., non-blocking) from any task on any OS thread. The catch is that we don’t have AtomicVector{T}
. But it’s possible to simulate this as long as T
is pointer-free (“immutable”): Advice on using unsafe_wrap to view integers as atomic integers - #2 by tkf
Of course, if the reader request too old data, the output will be empty (isempty(first_valid:last_valid)
) and implementing “back pressure” (which would occur naturally with channels) would be hard.