FLoops @init allocate only once

Hi,
Does anyone know if there are any tricks to using FLoops thread-safe buffers for better performance? I am trying to adapt my code which makes use of Buffers such as Buffers[Threads.threadid()] to the recommended approach of using FLoops.@init.
However I see drastically increased allocations.
I think this is because @init allocates once every time before the parallelized loop runs. If the function that contains this loop is used often it needs to re-allocate the buffers every time.
Is there an elegant way around this?
In case that helps, here is a MWE:

Summary
using FLoops

function useBuff!(arr,i,j,Buff)
    Buff .= 0
    for k in eachindex(Buff)
        Buff[k] += k
    end
    arr[i,j] = Buff[i]
end

function testBuff(arr,Buffer)
    
    @sync for i in axes(arr,1)
        for j in axes(arr,2)
            Threads.@spawn begin 
                Buff = Buffer[Threads.threadid()]
                useBuff!(arr,i,j,Buff)
            end
        end
    end
    return arr
end

function testFloopsBuff(arr,Buffer)
    @floop for i in axes(arr,1), j in axes(arr,2)
        Buff = Buffer[Threads.threadid()]
        useBuff!(arr,i,j,Buff)
    end
    return arr
end

function testFloopsNoBuff(arr)
    @floop ThreadedEx(basesize = 1) for i in axes(arr,1), j in axes(arr,2)
        @init Buff = zeros(size(arr,1))
        useBuff!(arr,i,j,Buff)
        end
    return arr
end

function timeall()
    N = 200
    arr = zeros(N,20)
    @time begin
        Buffer = [zeros(N) for _ in 1:Threads.nthreads()]
        x = 0.
        for i in 1:300
            x += maximum(testBuff(arr,Buffer))
        end
        x
    end
    @time begin
        Buffer = [zeros(N) for _ in 1:Threads.nthreads()]
        x = 0.
        for i in 1:300
            x += maximum(testFloopsBuff(arr,Buffer))
        end
        x
    end
    @time begin
        x = 0.
        for i in 1:300
            x += maximum(testFloopsNoBuff(arr))
        end
        x
    end
end

timeall()
timeall()

Perhaps you could use a Channel of buffers, and instead of

Buff = Buffer[Threads.threadid()]
useBuff!(arr,i,j,Buff)

do

Buff = take!(Buffer)
useBuff!(arr,i,j,Buff)
put!(Buffer, Buff)

This way, the buffer is removed from the Channel immediately, and two tasks won’t access the same one.

1 Like

I am not terribly familiar with Channels, could you elaborate on how to create such a channel of buffers to pass to my function?
I have tried to do something like

julia> buff = Channel()
       for _ in 1:Threads.nthreads()
           put!(buff,zeros(3))
       end

but it does not terminate and I haven’t been able to find this anywhere so far.

You need to specify the number of elements while creating the Channel; otherwise, it assumes zero elements by default.

julia> buff = Channel(Threads.nthreads())
Channel{Any}(1) (empty)

julia> for _ in 1:Threads.nthreads()
                  put!(buff,zeros(3))
              end

julia> buff
Channel{Any}(1) (1 item available)

julia> take!(buff)
3-element Vector{Float64}:
 0.0
 0.0
 0.0
2 Likes

How do we release the channels? I’ve tried this:

julia> buff = Channel{Vector{Float64}}(4)
Channel{Vector{Float64}}(4) (empty)

julia> for i in 1:4
           put!(buff, zeros(3))
       end

julia> Threads.@threads for i in 1:4
           b = take!(buff)
           b .= Threads.threadid()
           @show sum(b)
       end
sum(b) = 6.0
sum(b) = 9.0
sum(b) = 3.0
sum(b) = 12.0

julia> Threads.@threads for i in 1:4
           b = take!(buff)
           b .= Threads.threadid()
           @show sum(b)
       end  ### NEVER ENDS - channels are blocked
^C^C^C^C^CWARNING: Force throwing a SIGINT

Seems that close(buff) closes the channel for good, which might not be what one wants.

1 Like

You need to put the buffer back in the channel after use.

julia> Threads.@threads for i in 1:4
           b = take!(buff)
           b .= Threads.threadid()
           @show sum(b)
           put!(buff, b)
       end
sum(b) = 12.0
sum(b) = 3.0
sum(b) = 6.0
sum(b) = 9.0

julia> Threads.@threads for i in 1:4
           b = take!(buff)
           b .= Threads.threadid()
           @show sum(b)
           put!(buff, b)
       end
sum(b) = 3.0
sum(b) = 9.0
sum(b) = 12.0
sum(b) = 6.0
1 Like

I don’t seem to get it right in the example here. For instance, this function, when run, never ends: No threading involved, just put/take from the channel here:

function testchannels()
    N = 200
    arr = zeros(N,20)
    Buffer = [zeros(N) for _ in 1:Threads.nthreads()]
    channel = Channel{eltype(Buffer)}()
    for buff in Buffer
        put!(channel, buff)
    end
    for i in axes(arr,1), j in axes(arr,2)
        Buff = take!(channel)
        useBuff!(arr,i,j,Buff)
        put!(channel, Buff)
    end
    close(channel)
    return arr
end
# edit: with
function useBuff!(arr,i,j,Buff)
    Buff .= 0
    for k in eachindex(Buff)
        Buff[k] += k
    end
    arr[i,j] = Buff[i]
end

You need to specify the length of the Channel; otherwise, it assumes a default length of zero and put! on the channel hangs. TBH, I’m unsure why the default value is zero, does it help in any situation?

This works:

julia> function testchannels()
           N = 200
           arr = zeros(N,20)
           Buffer = [zeros(N) for _ in 1:Threads.nthreads()]
           channel = Channel{eltype(Buffer)}(length(Buffer))
           for buff in Buffer
               put!(channel, buff)
           end
           for i in axes(arr,1), j in axes(arr,2)
               Buff = take!(channel)
               useBuff!(arr,i,j,Buff)
               put!(channel, Buff)
           end
           close(channel)
           return arr
       end
testchannels (generic function with 1 method)

julia> # edit: with
       function useBuff!(arr,i,j,Buff)
           Buff .= 0
           for k in eachindex(Buff)
               Buff[k] += k
           end
           arr[i,j] = Buff[i]
       end
useBuff! (generic function with 1 method)

julia> testchannels()
200×20 Matrix{Float64}:
   1.0    1.0    1.0    1.0    1.0  …    1.0    1.0    1.0    1.0    1.0
   2.0    2.0    2.0    2.0    2.0       2.0    2.0    2.0    2.0    2.0
   3.0    3.0    3.0    3.0    3.0       3.0    3.0    3.0    3.0    3.0
   4.0    4.0    4.0    4.0    4.0       4.0    4.0    4.0    4.0    4.0
   5.0    5.0    5.0    5.0    5.0       5.0    5.0    5.0    5.0    5.0
   6.0    6.0    6.0    6.0    6.0  …    6.0    6.0    6.0    6.0    6.0
   7.0    7.0    7.0    7.0    7.0       7.0    7.0    7.0    7.0    7.0
   8.0    8.0    8.0    8.0    8.0       8.0    8.0    8.0    8.0    8.0
   ⋮                                ⋱    ⋮                         
 193.0  193.0  193.0  193.0  193.0     193.0  193.0  193.0  193.0  193.0
 194.0  194.0  194.0  194.0  194.0     194.0  194.0  194.0  194.0  194.0
 195.0  195.0  195.0  195.0  195.0     195.0  195.0  195.0  195.0  195.0
 196.0  196.0  196.0  196.0  196.0  …  196.0  196.0  196.0  196.0  196.0
 197.0  197.0  197.0  197.0  197.0     197.0  197.0  197.0  197.0  197.0
 198.0  198.0  198.0  198.0  198.0     198.0  198.0  198.0  198.0  198.0
 199.0  199.0  199.0  199.0  199.0     199.0  199.0  199.0  199.0  199.0
 200.0  200.0  200.0  200.0  200.0     200.0  200.0  200.0  200.0  200.0

1 Like

I see, silly mistake. Do you any reason for one to be able to put! any number of stuff in the channel, independently of its size?

Do you mean you don’t want to specify the size of the channel? You may use Channel{T}(Inf) to create one that can hold an arbitrary number of elements (limited by typemax(Int), I suppose).

1 Like

Uhm, I see. It was not what I was expecting. I thought this would “work”, but it hangs (as in the case of the zero-length channel):

julia> c = Channel{Int}(4)
Channel{Int64}(4) (empty)

julia> for i in 1:5
           put!(c, 1)
       end

I see now. Channel is a lazy reference storage for the objects, so one can create it very large if one wants to.

Yes, the size that you specify while creating the channel is the maximum number of elements that you may put into it. If more elements than this size are put into the channel, the request will have to keep waiting until some other task takes from the channel, creating space for the additional elements.

This, for example, finishes

julia> c = Channel{Int}(4)
Channel{Int64}(4) (empty)

julia> @sync begin
           @async for i in 1:5
                  put!(c, i)
              end
           @async for i in 1:5
               @show take!(c)
           end
       end;
take!(c) = 1
take!(c) = 2
take!(c) = 3
take!(c) = 4
take!(c) = 5
2 Likes

Thanks for all that. I finally got the idea of the channels.

On the other side, this patterns appears to have quite a large some overhead relative to using threadid() (if there is nothing wrong with that code). I mentioned to @Salmon in PV that I have this very simple package ChunkSplitters.jl to exactly help with avoiding the threadid() pattern without that kind of overhead. It is something simple, almost as a manual splitting of the loop into the desired number of chunks. Then his code is written as:

using ChunkSplitters
function test_chunks(arr,Buffer)
    Threads.@threads for (i_range, ichunk) in chunks(axes(arr,1), length(Buffer))
        Buff = Buffer[ichunk]
        for i in i_range, j in axes(arr,2)
            useBuff!(arr,i,j,Buff)
        end
    end
    return arr
end

I keep chasing a more consensual alternative to that, if there is any. I get:

julia> timeall()
@threads with threadid():
  138.663 μs (48 allocations: 4.17 KiB)
chunks from ChunkSplitters:
  139.187 μs (48 allocations: 4.33 KiB)
channels:
  162.981 μs (57 allocations: 4.48 KiB)

With this code:

code
using FLoops
using ChunkSplitters
using BenchmarkTools
using Test

function useBuff!(arr,i,j,Buff)
    Buff .= 0
    for k in eachindex(Buff)
        Buff[k] += k
    end
    arr[i,j] = Buff[i]
end

function test_threadid(arr, Buffer)
    Threads.@threads for i in axes(arr,1)
        Buff = Buffer[Threads.threadid()]
        for j in axes(arr,2)
            useBuff!(arr,i,j,Buff)
        end
    end
    return arr
end

function test_chunks(arr,Buffer)
    Threads.@threads for (i_range, ichunk) in chunks(axes(arr,1), length(Buffer))
        Buff = Buffer[ichunk]
        for i in i_range, j in axes(arr,2)
            useBuff!(arr,i,j,Buff)
        end
    end
    return arr
end

function test_channel(arr,channel)
    Threads.@threads for i in axes(arr,1)
        Buff = take!(channel)
        for j in axes(arr,2)
            useBuff!(arr,i,j,Buff)
        end
        put!(channel, Buff)
    end
    return arr
end

function timeall()
    N = 200
    arr = zeros(N,20)
    Buffer = [zeros(N) for _ in 1:Threads.nthreads()]

    channel = Channel{eltype(Buffer)}(length(Buffer))
    for buff in Buffer
        put!(channel, buff)
    end

    arr0 = copy(arr)
    test_threadid(arr0, Buffer)
    arr1 = copy(arr)
    test_chunks(arr1, Buffer)
    arr2 = copy(arr)
    test_channel(arr2, channel)
    @test isapprox(arr0,arr1)
    @test isapprox(arr0,arr1)

    println("@threads with threadid():")
    @btime test_threadid($arr,$Buffer)
    println("chunks from ChunkSplitters:")
    @btime test_chunks($arr,$Buffer)
    println("channels:")
    @btime test_channel($arr,$channel)

    nothing
end

EDIT: There is an overhead, but not as large. I was taking/puting channels in the inner loop. Thus, it can be nice alternative it the inner operations on each thread are not very fast relative to that overhead. And it is a nice style overall.

1 Like

Thanks so much from my side too for the explanation on channels.
Channel Splitters does seem to have the least overhead while it is also actually safe.
Its nice to know though that the slightly increased overhead of using channels seems to be diminished once we get to workloads that are a bit bigger, since not as much time is spent on take!
In my case that does seem to remove the need for FLoops, though maybe I’ll find it worthwhile to experiment with the other schedulers they have.

1 Like