I’ve noticed when using Channel
that my code spends a lot of time doing lock
/ unlock
. To improve this, I implemented take!
, put!
(varargs), and puts!
(vector) that allow for taking or putting multiple items at once. This results in a 50x speed improvement in the best case. In some cases, one could simply use channels in chunks, but this implementation has the advantage of being more flexible as could e.g. put!
values one-by-one and then take!
in batches of 100.
Would this be a welcome contribution to Base? I haven’t yet contributed, but happy to work on adding this if there is interest.
Benchmarking channel - 1,000 items
110.969 μs (1 allocation: 7.94 KiB)
Benchmarking varargs put! - 1,000 items
44.708 μs (990 allocations: 86.58 KiB)
Benchmarking puts! - 1,000 items
1.947 μs (3 allocations: 23.81 KiB)
Benchmarking channel - 100,000 items
11.183 ms (2 allocations: 781.33 KiB)
Benchmarking puts! - 100,000 items
220.483 μs (6 allocations: 2.29 MiB)
using BenchmarkTools
c = Channel{Int}(Inf)
function Base.put!(c::Channel{T}, args...) where {T}
Base.check_channel_state(c)
args = convert.(T, args)
if Base.isbuffered(c)
return put_buffered(c, args...)
else
for v in args
put_unbuffered(c, v)
end
end
return args
end
function put_buffered(c::Channel, args...)
lock(c)
n = length(args)
try
while length(c.data) >= (c.sz_max - n)
check_channel_state(c)
wait(c.cond_put)
end
append!(c.data, args)
# notify all, since multiple take! may be possible
notify(c.cond_take, nothing, true, false)
finally
unlock(c)
end
return args
end
function puts!(c::Channel{T}, args) where {T}
Base.check_channel_state(c)
args = convert.(T, args)
if Base.isbuffered(c)
return puts_buffered(c, args)
else
for v in args
put_unbuffered(c, v)
end
end
return args
end
function puts_buffered(c::Channel, args)
lock(c)
n = length(args)
try
while length(c.data) >= (c.sz_max - n)
Base.check_channel_state(c)
wait(c.cond_put)
end
append!(c.data, args)
# notify all, since multiple take! may be possible
notify(c.cond_take, nothing, true, false)
finally
unlock(c)
end
return args
end
Base.take!(c::Channel, n::Integer) = n == 1 ? take!(c) : take_buffered(c, n)
function take_buffered(c::Channel, n::Integer)
lock(c)
try
while length(c.data) < n
Base.check_channel_state(c)
wait(c.cond_take)
end
v = splice!(c.data, 1:n)
# notify all, since multiple slots may become available for a put!.
notify(c.cond_put, nothing, true, false)
return v
finally
unlock(c)
end
end
function bench_channel(c, n = 10000)
for i in 1:n
put!(c, i)
end
[take!(c) for i in 1:n]
end
function bench_channel_varargs(c, n = 10000)
put!(c, collect(1:n)...)
take!(c, n)
end
function bench_channel_multiple(c, n = 10000)
puts!(c, collect(1:n))
take!(c, n)
end
println("Benchmarking channel - 1,000 items") # 111us
@btime bench_channel(c, 1000);
println("Benchmarking varargs put! - 1,000 items") # 45us
@btime bench_channel_varargs(c, 1000); # hangs / crashes on n>=10000
println("Benchmarking puts! - 1,000 items") # 1.7us
@btime bench_channel_multiple(c, 1000);
println("Benchmarking channel - 100,000 items") # 111us
@btime bench_channel(c, 100000);
println("Benchmarking channel - 100,000 items") # 111us
@btime bench_channel_multiple(c, 100000);