This has been asked before, but it looks like it didn’t go anywhere. Channel
s are very powerful, but the lock overhead makes it slow for iterating. Bundling items into batches before take!/put!
would help with this overhead.
I wrote up a proof of concept by stealing heavily from Base
:
function Base.append!(c::Channel{T}, vec::AbstractArray) where {T}
current_idx = firstindex(vec)
final_idx = lastindex(vec)
final_idx_plus_one = final_idx + 1
elements_to_add = length(vec)
# Increment channel n_avail eagerly (before push!) to count data in the
# buffer as well as offers from tasks which are blocked in wait().
Base._increment_n_avail(c, elements_to_add)
while current_idx <= final_idx
lock(c)
did_buffer = false
try
while length(c.data) == c.sz_max
Base.check_channel_state(c)
wait(c.cond_put)
end
# Grab a chunk of items that will fit in the channel's buffer
available_space = c.sz_max - length(c.data)
next_idx = min(final_idx_plus_one, current_idx + available_space)
chunk = Iterators.map(x -> convert(T, x), view(vec, current_idx:(next_idx-1)))
Base.check_channel_state(c)
append!(c.data, chunk)
# We successfully added chunk, so decrement our elements to add in case of
# errors
elements_to_add -= next_idx - current_idx
did_buffer = true
notify(c.cond_take, nothing, true, false)
# notify all, since some of the waiters may be on a "fetch" call.
next_idx > final_idx && break
current_idx = next_idx
finally
# Decrement the available items if this task had an exception before pushing the
# item to the buffer (e.g., during `wait(c.cond_put)`):
did_buffer || Base._increment_n_avail(c, -elements_to_add)
unlock(c)
end
end
return c
end
function take_batch!(c::Channel{T}, n) where {T}
if c.sz_max < n
throw(ArgumentError("Batch size, $n, is too large for a channel with buffer length $(c.sz_max)"))
end
lock(c)
try
while isempty(c.data)
Base.check_channel_state(c)
wait(c.cond_take)
end
take_n = min(n, length(c.data))
ret = Vector{T}(undef, take_n)
@inbounds for i in eachindex(ret)
ret[i] = c.data[i]
end
foreach(_ -> popfirst!(c.data), 1:take_n)
Base._increment_n_avail(c, -take_n)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
return ret
finally
unlock(c)
end
end
Here is a benchmark which indicates to me that this direction has promise:
julia> function bench_basic(item_n, buffer_len)
items = collect(1:item_n)
ch = Channel{Int}(buffer_len)
task_n = Threads.nthreads()
res = Vector{Int}(undef, item_n * task_n)
for _ in 1:task_n
Threads.@spawn begin
for j in items
put!(ch, j)
end
end
end
@sync for i in Base.OneTo(task_n)
Threads.@spawn let offset = (i - 1) * item_n
for j in Base.OneTo(item_n)
x = take!(ch)
res[offset+j] = x
end
end
end
res
end
bench_basic (generic function with 1 method)
julia> function bench_batch(item_n, buffer_len)
items = collect(1:item_n)
ch = Channel{Int}(buffer_len)
task_n = Threads.nthreads()
res = Vector{Int}(undef, item_n * task_n)
ch = Channel{Int}(buffer_len)
for _ in 1:task_n
Threads.@spawn begin
i = 1
while i <= item_n
append!(ch, @view items[i:min(i + buffer_len - 1, item_n)])
i += buffer_len
end
end
end
@sync for i in Base.OneTo(task_n)
Threads.@spawn let offset = (i - 1) * item_n
batch = take_batch!(ch, buffer_len)
batch_len = length(batch)
batch_i = 1
for j in Base.OneTo(item_n)
if batch_i > batch_len
batch = take_batch!(ch, buffer_len)
batch_i = 1
batch_len = length(batch)
end
x = batch[batch_i]
res[offset+j] = x
batch_i += 1
end
end
end
res
end
bench_batch (generic function with 1 method)
julia> @b bench_basic(10000, 10)
186.438 ms (248 allocs: 724.531 KiB)
julia> @b bench_batch(10000, 10)
62.537 ms (171511 allocs: 4.534 MiB)
julia> @b bench_basic(10000, 100)
104.146 ms (104 allocs: 721.078 KiB)
julia> @b bench_batch(10000, 100)
6.494 ms (157111 allocs: 3.792 MiB)
julia> @b bench_basic(10000, 1000)
101.432 ms (106 allocs: 767.453 KiB)
julia> @b bench_batch(10000, 1000)
3.913 ms (194872 allocs: 4.364 MiB)
Edit: a previous version of this post had incorrect benchmarks. Still, we see that as the buffer size of the channel increases, we get better and better performance.
I have a rough draft for how to do this with unbuffered channels, but it’s still WIP.