Does anyone know of a package which implements a (circular) buffer that blocks when popping from it if it’s empty?
To give a bit more context, below is a MWE for a program which has one thread running and continuously fetching data from a source (with high and sometimes varying rates) and putting it into a circular buffer with a limited size to avoid a memory fill-up. The main thread has an endless loop where it fetches data from this buffer and processes it (which has also varying processing rates).
In the simplified example below, the data rate is 0.5s and the processing rate 1.0s with a buffer which can hold up to 5 elements.
There is some boilerplate code (I know it’s not much) with locks and also an ugly sleep(0.1) which is needed, otherwise the loop hangs of course with 100% when the buffer happens to be empty.
Long story short: is there any package which handles this elegantly and efficiently? Preferably a package which offers such a buffer type which is thread safe and allows something like get(buffer; block=true, timeout=...)?
I looked around but could not find anything yet. Maybe I am overthinking this and the below implementation is already OK as it is
Here is the MWE:
using DataStructures
DATA_RATE = 0.5
PROCESSING_RATE = 1.0
function datafetcher(buffer, lck)
i = 0
while true
lock(lck)
println(" -> pushing $i to $buffer")
push!(buffer, i)
unlock(lck)
i += 1
sleep(DATA_RATE)
end
end
function main()
buffer = CircularBuffer{Int}(5)
lck = ReentrantLock()
@async datafetcher(buffer, lck)
while true
if isempty(buffer)
sleep(0.1) # needed to not choke on this loop when the buffer is empty
continue
end
lock(lck)
data = popfirst!(buffer)
unlock(lck)
println("processing $data")
sleep(PROCESSING_RATE)
end
end
main()
I guess Channel it is, but unfortunately there is no documented way (public API) to check if a channel is full although there is a constructor to create a channel with a given maximum size.
I can of course implement
isfull(c::Channel) = length(c.data) >= c.sz_max
but it doesn’t feel really future proof
julia> methodswith(Channel)
[1] bind(c::Channel, task::Task) @ Base channels.jl:268
[2] close(c::Channel) @ Base channels.jl:200
[3] close(c::Channel, excp::Exception) @ Base channels.jl:201
[4] fetch(c::Channel) @ Base channels.jl:419
[5] isempty(c::Channel) @ Base channels.jl:534
[6] isopen(c::Channel) @ Base channels.jl:214
[7] isready(c::Channel) @ Base channels.jl:533
[8] iterate(c::Channel) @ Base channels.jl:604
[9] iterate(c::Channel, state) @ Base channels.jl:604
[10] lock(c::Channel) @ Base channels.jl:540
[11] lock(f, c::Channel) @ Base channels.jl:541
[12] put!(c::Channel{T}, v) where T @ Base channels.jl:333
[13] show(io::IO, c::Channel) @ Base channels.jl:585
[14] show(io::IO, ::MIME{Symbol("text/plain")}, c::Channel) @ Base channels.jl:587
[15] take!(c::Channel) @ Base channels.jl:465
[16] trylock(c::Channel) @ Base channels.jl:543
[17] unlock(c::Channel) @ Base channels.jl:542
[18] wait(c::Channel) @ Base channels.jl:569
[19] Distributed.WorkerPool(c::Channel, ref::Distributed.RemoteChannel) @ Distributed ~/.julia/juliaup/julia-1.9.4+0.aarch64.apple.darwin14/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:26
julia> c = Channel{UInt8}(5)
Channel{UInt8}(5) (empty)
julia> put!(c, 3)
0x03
julia> put!(c, 4)
0x04
julia> put!(c, 5)
0x05
julia> c.data
3-element Vector{UInt8}:
0x03
0x04
0x05
julia> c.
cond_put cond_take cond_wait data excp n_avail_items
state sz_max
julia> c.sz_max
5
If I were you, I’d probably just copy the function I linked, rename it to put_buffered_or_discard!, and then change the while loop to abort if it’s full. That way you can be reasonably sure you don’t put in any bugs with race conditions, assuming that this code doesn’t have any.
Yes, I am preparing a tiny PR with such a function since I think it’s much cleaner to have an exported function than using two fields which are not documented and might change in future ;)
It’s not what you need, as it only gives up if the channel is closed, not if it’s full. Perhaps it’s possible to resurrect this PR and add a keyword arg tryput!(ch, x; skipfull=true)?