Thread safe buffer which blocks retrieval when empty?

Does anyone know of a package which implements a (circular) buffer that blocks when popping from it if it’s empty?

To give a bit more context, below is a MWE for a program which has one thread running and continuously fetching data from a source (with high and sometimes varying rates) and putting it into a circular buffer with a limited size to avoid a memory fill-up. The main thread has an endless loop where it fetches data from this buffer and processes it (which has also varying processing rates).

In the simplified example below, the data rate is 0.5s and the processing rate 1.0s with a buffer which can hold up to 5 elements.

There is some boilerplate code (I know it’s not much) with locks and also an ugly sleep(0.1) which is needed, otherwise the loop hangs of course with 100% when the buffer happens to be empty.

Long story short: is there any package which handles this elegantly and efficiently? Preferably a package which offers such a buffer type which is thread safe and allows something like get(buffer; block=true, timeout=...)?

I looked around but could not find anything yet. Maybe I am overthinking this and the below implementation is already OK as it is :wink:

Here is the MWE:

using DataStructures

DATA_RATE = 0.5
PROCESSING_RATE = 1.0

function datafetcher(buffer, lck)
  i = 0
  while true
    lock(lck)
    println("  -> pushing $i to $buffer")
    push!(buffer, i)
    unlock(lck)
    i += 1
    sleep(DATA_RATE)
  end
end

function main()
  buffer = CircularBuffer{Int}(5)
  lck = ReentrantLock()

  @async datafetcher(buffer, lck)

  while true
    if isempty(buffer)
      sleep(0.1)  # needed to not choke on this loop when the buffer is empty
      continue
    end

    lock(lck)
    data = popfirst!(buffer)
    unlock(lck)

    println("processing $data")
    sleep(PROCESSING_RATE)
  end
end

main()

Sounds like a classic use for a Channel.

1 Like

I was thinking about that actually. I’d need to implement the logic to discard data in the retrieval function.

I guess Channel it is, but unfortunately there is no documented way (public API) to check if a channel is full although there is a constructor to create a channel with a given maximum size.

I can of course implement

isfull(c::Channel) = length(c.data) >= c.sz_max

but it doesn’t feel really future proof :wink:

julia> methodswith(Channel)
[1] bind(c::Channel, task::Task) @ Base channels.jl:268
[2] close(c::Channel) @ Base channels.jl:200
[3] close(c::Channel, excp::Exception) @ Base channels.jl:201
[4] fetch(c::Channel) @ Base channels.jl:419
[5] isempty(c::Channel) @ Base channels.jl:534
[6] isopen(c::Channel) @ Base channels.jl:214
[7] isready(c::Channel) @ Base channels.jl:533
[8] iterate(c::Channel) @ Base channels.jl:604
[9] iterate(c::Channel, state) @ Base channels.jl:604
[10] lock(c::Channel) @ Base channels.jl:540
[11] lock(f, c::Channel) @ Base channels.jl:541
[12] put!(c::Channel{T}, v) where T @ Base channels.jl:333
[13] show(io::IO, c::Channel) @ Base channels.jl:585
[14] show(io::IO, ::MIME{Symbol("text/plain")}, c::Channel) @ Base channels.jl:587
[15] take!(c::Channel) @ Base channels.jl:465
[16] trylock(c::Channel) @ Base channels.jl:543
[17] unlock(c::Channel) @ Base channels.jl:542
[18] wait(c::Channel) @ Base channels.jl:569
[19] Distributed.WorkerPool(c::Channel, ref::Distributed.RemoteChannel) @ Distributed ~/.julia/juliaup/julia-1.9.4+0.aarch64.apple.darwin14/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:26

julia> c = Channel{UInt8}(5)
Channel{UInt8}(5) (empty)

julia> put!(c, 3)
0x03

julia> put!(c, 4)
0x04

julia> put!(c, 5)
0x05

julia> c.data
3-element Vector{UInt8}:
 0x03
 0x04
 0x05

julia> c.
cond_put       cond_take      cond_wait      data           excp           n_avail_items
state          sz_max
julia> c.sz_max
5

Why do you need to know when your Channel is full? What would you do in that situation? Discard the result?

Currently channels.jl seems to be doing pretty much what your isfull function would be doing: https://github.com/JuliaLang/julia/blob/abeb68feea5da6151a83ec27786a23b71e06fc88/base/channels.jl#L362

        while length(c.data) == c.sz_max
            check_channel_state(c)
            wait(c.cond_put)
        end
1 Like

Yes, I deal with high data rates and when the processing cannot keep up, I need to discard the data to avoid a memory pile-up.

If I were you, I’d probably just copy the function I linked, rename it to put_buffered_or_discard!, and then change the while loop to abort if it’s full. That way you can be reasonably sure you don’t put in any bugs with race conditions, assuming that this code doesn’t have any.

Yes, I am preparing a tiny PR with such a function since I think it’s much cleaner to have an exported function than using two fields which are not documented and might change in future ;)

Sure, in theory they could change in the future – I don’t see why they would though. They’ve been the same for 8 years.

It’s just good practice :slight_smile: we’ll see how the PR goes… :laughing:

2 Likes

xref this open PR that implements tryput!: add maybetake! and tryput! by tkf · Pull Request #41966 · JuliaLang/julia · GitHub

It’s not what you need, as it only gives up if the channel is closed, not if it’s full. Perhaps it’s possible to resurrect this PR and add a keyword arg tryput!(ch, x; skipfull=true)?

1 Like