Reducing Channel Lock Contention with Batches

This has been asked before, but it looks like it didn’t go anywhere. Channels are very powerful, but the lock overhead makes it slow for iterating. Bundling items into batches before take!/put! would help with this overhead.

I wrote up a proof of concept by stealing heavily from Base:

function Base.append!(c::Channel{T}, vec::AbstractArray) where {T}
    current_idx = firstindex(vec)
    final_idx = lastindex(vec)
    final_idx_plus_one = final_idx + 1

    elements_to_add = length(vec)
    # Increment channel n_avail eagerly (before push!) to count data in the
    # buffer as well as offers from tasks which are blocked in wait().
    Base._increment_n_avail(c, elements_to_add)
    while current_idx <= final_idx
        lock(c)
        did_buffer = false
        try
            while length(c.data) == c.sz_max
                Base.check_channel_state(c)
                wait(c.cond_put)
            end
            # Grab a chunk of items that will fit in the channel's buffer
            available_space = c.sz_max - length(c.data)
            next_idx = min(final_idx_plus_one, current_idx + available_space)
            chunk = Iterators.map(x -> convert(T, x), view(vec, current_idx:(next_idx-1)))

            Base.check_channel_state(c)
            append!(c.data, chunk)
            # We successfully added chunk, so decrement our elements to add in case of
            # errors
            elements_to_add -= next_idx - current_idx
            did_buffer = true
            notify(c.cond_take, nothing, true, false)
            # notify all, since some of the waiters may be on a "fetch" call.
            next_idx > final_idx && break
            current_idx = next_idx
        finally
            # Decrement the available items if this task had an exception before pushing the
            # item to the buffer (e.g., during `wait(c.cond_put)`):
            did_buffer || Base._increment_n_avail(c, -elements_to_add)
            unlock(c)
        end
    end
    return c
end


function take_batch!(c::Channel{T}, n) where {T}
    if c.sz_max < n
        throw(ArgumentError("Batch size, $n, is too large for a channel with buffer length $(c.sz_max)"))
    end
    lock(c)
    try
        while isempty(c.data)
            Base.check_channel_state(c)
            wait(c.cond_take)
        end

        take_n = min(n, length(c.data))
        ret = Vector{T}(undef, take_n)
        @inbounds for i in eachindex(ret)
            ret[i] = c.data[i]
        end
        foreach(_ -> popfirst!(c.data), 1:take_n)
        Base._increment_n_avail(c, -take_n)
        notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
        return ret
    finally
        unlock(c)
    end
end

Here is a benchmark which indicates to me that this direction has promise:

julia> function bench_basic(item_n, buffer_len)
           items = collect(1:item_n)
           ch = Channel{Int}(buffer_len)
           task_n = Threads.nthreads()
           res = Vector{Int}(undef, item_n * task_n)

           for _ in 1:task_n
               Threads.@spawn begin
                   for j in items
                       put!(ch, j)
                   end
               end
           end

           @sync for i in Base.OneTo(task_n)
               Threads.@spawn let offset = (i - 1) * item_n
                   for j in Base.OneTo(item_n)
                       x = take!(ch)
                       res[offset+j] = x
                   end
               end
           end
           res
       end
bench_basic (generic function with 1 method)

julia> function bench_batch(item_n, buffer_len)
           items = collect(1:item_n)
           ch = Channel{Int}(buffer_len)
           task_n = Threads.nthreads()
           res = Vector{Int}(undef, item_n * task_n)

           ch = Channel{Int}(buffer_len)
           for _ in 1:task_n
               Threads.@spawn begin
                   i = 1
                   while i <= item_n
                       append!(ch, @view items[i:min(i + buffer_len - 1, item_n)])
                       i += buffer_len
                   end
               end
           end

           @sync for i in Base.OneTo(task_n)
               Threads.@spawn let offset = (i - 1) * item_n
                   batch = take_batch!(ch, buffer_len)
                   batch_len = length(batch)
                   batch_i = 1
                   for j in Base.OneTo(item_n)
                       if batch_i > batch_len
                           batch = take_batch!(ch, buffer_len)
                           batch_i = 1
                           batch_len = length(batch)
                       end
                       x = batch[batch_i]
                       res[offset+j] = x
                       batch_i += 1
                   end
               end

           end
           res
       end
bench_batch (generic function with 1 method)

julia> @b bench_basic(10000, 10)
186.438 ms (248 allocs: 724.531 KiB)

julia> @b bench_batch(10000, 10)
62.537 ms (171511 allocs: 4.534 MiB)

julia> @b bench_basic(10000, 100)
104.146 ms (104 allocs: 721.078 KiB)

julia> @b bench_batch(10000, 100)
6.494 ms (157111 allocs: 3.792 MiB)

julia> @b bench_basic(10000, 1000)
101.432 ms (106 allocs: 767.453 KiB)

julia> @b bench_batch(10000, 1000)
3.913 ms (194872 allocs: 4.364 MiB)

Edit: a previous version of this post had incorrect benchmarks. Still, we see that as the buffer size of the channel increases, we get better and better performance.

I have a rough draft for how to do this with unbuffered channels, but it’s still WIP.

2 Likes

Won’t this deadlock? You’re waiting on the condition variable while holding the lock.

This is exactly how take/put are implemented in Base. I thought the same thing, but if you run the code, you’ll see that, empirically, it doesn’t deadlock. I’m hoping someone with a better understanding of Reentrant locks can help make sense of it.