This has been asked before, but it looks like it didn’t go anywhere. Channels are very powerful, but the lock overhead makes it slow for iterating. Bundling items into batches before take!/put! would help with this overhead.
I wrote up a proof of concept by stealing heavily from Base:
function Base.append!(c::Channel{T}, vec::AbstractArray) where {T}
current_idx = firstindex(vec)
final_idx = lastindex(vec)
final_idx_plus_one = final_idx + 1
elements_to_add = length(vec)
# Increment channel n_avail eagerly (before push!) to count data in the
# buffer as well as offers from tasks which are blocked in wait().
Base._increment_n_avail(c, elements_to_add)
while current_idx <= final_idx
lock(c)
did_buffer = false
try
while length(c.data) == c.sz_max
Base.check_channel_state(c)
wait(c.cond_put)
end
# Grab a chunk of items that will fit in the channel's buffer
available_space = c.sz_max - length(c.data)
next_idx = min(final_idx_plus_one, current_idx + available_space)
chunk = Iterators.map(x -> convert(T, x), view(vec, current_idx:(next_idx-1)))
Base.check_channel_state(c)
append!(c.data, chunk)
# We successfully added chunk, so decrement our elements to add in case of
# errors
elements_to_add -= next_idx - current_idx
did_buffer = true
notify(c.cond_take, nothing, true, false)
# notify all, since some of the waiters may be on a "fetch" call.
next_idx > final_idx && break
current_idx = next_idx
finally
# Decrement the available items if this task had an exception before pushing the
# item to the buffer (e.g., during `wait(c.cond_put)`):
did_buffer || Base._increment_n_avail(c, -elements_to_add)
unlock(c)
end
end
return c
end
function take_batch!(c::Channel{T}, n) where {T}
if c.sz_max < n
throw(ArgumentError("Batch size, $n, is too large for a channel with buffer length $(c.sz_max)"))
end
lock(c)
try
while isempty(c.data)
Base.check_channel_state(c)
wait(c.cond_take)
end
take_n = min(n, length(c.data))
ret = Vector{T}(undef, take_n)
@inbounds for i in eachindex(ret)
ret[i] = c.data[i]
end
foreach(_ -> popfirst!(c.data), 1:take_n)
Base._increment_n_avail(c, -take_n)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
return ret
finally
unlock(c)
end
end
Here is a benchmark which indicates to me that this direction has promise:
julia> function bench_basic(item_n, buffer_len)
items = collect(1:item_n)
ch = Channel{Int}(buffer_len)
task_n = Threads.nthreads()
res = Vector{Int}(undef, item_n * task_n)
for _ in 1:task_n
Threads.@spawn begin
for j in items
put!(ch, j)
end
end
end
@sync for i in Base.OneTo(task_n)
Threads.@spawn let offset = (i - 1) * item_n
for j in Base.OneTo(item_n)
x = take!(ch)
res[offset+j] = x
end
end
end
res
end
bench_basic (generic function with 1 method)
julia> function bench_batch(item_n, buffer_len)
items = collect(1:item_n)
ch = Channel{Int}(buffer_len)
task_n = Threads.nthreads()
res = Vector{Int}(undef, item_n * task_n)
ch = Channel{Int}(buffer_len)
for _ in 1:task_n
Threads.@spawn begin
i = 1
while i <= item_n
append!(ch, @view items[i:min(i + buffer_len - 1, item_n)])
i += buffer_len
end
end
end
@sync for i in Base.OneTo(task_n)
Threads.@spawn let offset = (i - 1) * item_n
batch = take_batch!(ch, buffer_len)
batch_len = length(batch)
batch_i = 1
for j in Base.OneTo(item_n)
if batch_i > batch_len
batch = take_batch!(ch, buffer_len)
batch_i = 1
batch_len = length(batch)
end
x = batch[batch_i]
res[offset+j] = x
batch_i += 1
end
end
end
res
end
bench_batch (generic function with 1 method)
julia> @b bench_basic(10000, 10)
186.438 ms (248 allocs: 724.531 KiB)
julia> @b bench_batch(10000, 10)
62.537 ms (171511 allocs: 4.534 MiB)
julia> @b bench_basic(10000, 100)
104.146 ms (104 allocs: 721.078 KiB)
julia> @b bench_batch(10000, 100)
6.494 ms (157111 allocs: 3.792 MiB)
julia> @b bench_basic(10000, 1000)
101.432 ms (106 allocs: 767.453 KiB)
julia> @b bench_batch(10000, 1000)
3.913 ms (194872 allocs: 4.364 MiB)
Edit: a previous version of this post had incorrect benchmarks. Still, we see that as the buffer size of the channel increases, we get better and better performance.
I have a rough draft for how to do this with unbuffered channels, but it’s still WIP.