Improving Channel throughput: batching

I’ve noticed when using Channel that my code spends a lot of time doing lock / unlock. To improve this, I implemented take!, put! (varargs), and puts!(vector) that allow for taking or putting multiple items at once. This results in a 50x speed improvement in the best case. In some cases, one could simply use channels in chunks, but this implementation has the advantage of being more flexible as could e.g. put! values one-by-one and then take! in batches of 100.

Would this be a welcome contribution to Base? I haven’t yet contributed, but happy to work on adding this if there is interest.

Benchmarking channel - 1,000 items
  110.969 μs (1 allocation: 7.94 KiB)
Benchmarking varargs put! - 1,000 items
  44.708 μs (990 allocations: 86.58 KiB)
Benchmarking puts! - 1,000 items
  1.947 μs (3 allocations: 23.81 KiB)
Benchmarking channel - 100,000 items
  11.183 ms (2 allocations: 781.33 KiB)
Benchmarking puts! - 100,000 items
  220.483 μs (6 allocations: 2.29 MiB)
using BenchmarkTools
c = Channel{Int}(Inf)

function Base.put!(c::Channel{T}, args...) where {T}
    Base.check_channel_state(c)
    args = convert.(T, args)
    if Base.isbuffered(c)
        return put_buffered(c, args...)
    else
        for v in args
            put_unbuffered(c, v)
        end
    end
    return args
end

function put_buffered(c::Channel, args...)
    lock(c)
    n = length(args)
    try
        while length(c.data) >= (c.sz_max - n)
            check_channel_state(c)
            wait(c.cond_put)
        end
        append!(c.data, args)
        # notify all, since multiple take! may be possible
        notify(c.cond_take, nothing, true, false)
    finally
        unlock(c)
    end
    return args
end

function puts!(c::Channel{T}, args) where {T}
    Base.check_channel_state(c)
    args = convert.(T, args)
    if Base.isbuffered(c)
        return puts_buffered(c, args)
    else
        for v in args
            put_unbuffered(c, v)
        end
    end
    return args
end

function puts_buffered(c::Channel, args)
    lock(c)
    n = length(args)
    try
        while length(c.data) >= (c.sz_max - n)
            Base.check_channel_state(c)
            wait(c.cond_put)
        end
        append!(c.data, args)
        # notify all, since multiple take! may be possible
        notify(c.cond_take, nothing, true, false)
    finally
        unlock(c)
    end
    return args
end

Base.take!(c::Channel, n::Integer) = n == 1 ? take!(c) : take_buffered(c, n)
function take_buffered(c::Channel, n::Integer)
    lock(c)
    try
        while length(c.data) < n
            Base.check_channel_state(c)
            wait(c.cond_take)
        end
        v = splice!(c.data, 1:n)
        # notify all, since multiple slots may become available for a put!.
        notify(c.cond_put, nothing, true, false)
        return v
    finally
        unlock(c)
    end
end

function bench_channel(c, n = 10000)
    for i in 1:n
        put!(c, i)
    end
    [take!(c) for i in 1:n]
end

function bench_channel_varargs(c, n = 10000)
    put!(c, collect(1:n)...)
    take!(c, n)
end

function bench_channel_multiple(c, n = 10000)
    puts!(c, collect(1:n))
    take!(c, n)
end

println("Benchmarking channel - 1,000 items") # 111us
@btime bench_channel(c, 1000);
println("Benchmarking varargs put! - 1,000 items") # 45us
@btime bench_channel_varargs(c, 1000); # hangs / crashes on n>=10000
println("Benchmarking puts! - 1,000 items") # 1.7us
@btime bench_channel_multiple(c, 1000);
println("Benchmarking channel - 100,000 items") # 111us
@btime bench_channel(c, 100000);
println("Benchmarking channel - 100,000 items") # 111us
@btime bench_channel_multiple(c, 100000);
2 Likes

Yes, I think it’s a good idea and worth opening a PR. It’s been in my wish list. I think we can add a specialization to append!(::Channel, iterables...).

A tricky question is what should happen, e.g., with append!(channel, Iterators.map(f, xs)). Should we just assume f does not take some locks (which could result in deadlocks)? Or we should restrict the method to, say, append!(::Channel{T}, ::Array{T})? Maybe automatically copy the iterables to a buffer before taking the lock?

My thinking with append! vs puts! is that it follows the separation of push! vs put!, where the former never locks while the latter does lock, although happy to go with append! if you think it’s more consistent / better to avoid adding to namespace.

That’s a good question. append!(::Channel{T}, ::Array{T}) seems safe and what I had in mind, forcing a user to explicitly call collect(Iterators.map(f, xs))). Although I know you’ve thought a lot about these sorts of things, so curious if there’s a better approach.

I don’t think this is true? push!(channel, x) is just a thin wrapper of put!(channel, x)

In my mind all collections define push! should define append!; i.e., I expect append!(a, b) = foldl(push!, b; init = a) to be always a valid definition. Since Channel already defines push!, I think it’s natural to have append!.

Yes, I agree that’s probably the best first step. I expect it’d cover “90%” of use cases. We can worry about generalization later.

Another fun special case that could be optimized (and perhaps we’d need to worry about) is append!(::Channel, ::Channel).

2 Likes