How to write a multi-threaded reduction using Bumper.jl?

Hello!

I am trying to see if I can get a multithreaded reduction to work using Bumper.jl but my results are a bit embaressing… I can’t seem to get it to work and I don’t think it is using all my threads… The code snippet is below:

using Base.Threads
using Bumper
using StrideArrays # Not necessary, but can make operations like broadcasting with Bumper.jl faster.
using Polyester
using BenchmarkTools
using LoopVectorization

function NaiveReductionFunction!(dρdtI,I,J,drhopLp,drhopLn)
    # Reduction
    for iter in eachindex(I,J)
        i = I[iter]
        j = J[iter]

        dρdtI[i] +=  drhopLp[iter]
        dρdtI[j] +=  drhopLn[iter]
    end
end

function ReductionFunction!(dρdtI,I,J,drhopLp,drhopLn, buf)
    # Set up a scope where memory may be allocated, and does not escape:
    @no_escape buf begin
        # Allocate a `PtrArray` (see StrideArraysCore.jl) using memory from the default buffer.
        XT = eltype(dρdtI); XL = length(dρdtI)

        # How can I do this so that I don't have to manually hard code to 4 threads?
        # If I do, local_X = [@alloc(XT,XL) for _ in 1:nthreads()] then it will allocate!
        local_X1 = @alloc(XT, XL)
        local_X2 = @alloc(XT, XL)
        local_X3 = @alloc(XT, XL)
        local_X4 = @alloc(XT, XL)
        local_X  = (local_X1, local_X2, local_X3, local_X4)

        # I thought Bumper.jl would automatically reset the buffer?
        for x in local_X
            x .*= zero(XT)
        end

        # Remove @batch here to verify the two functions give the same answer..
        # @batch seems to only use 1 thread, @tturbo uses all four?
        @batch for iter in eachindex(I,J)
            i = I[iter]
            j = J[iter]

            # Accumulate in thread-local storage
            local_X[threadid()][i] +=  drhopLp[iter]
            local_X[threadid()][j] +=  drhopLn[iter]
        end

        # Reduce the thread-local storage into the shared arrays
        for tid in 1:nthreads()
            dρdtI .+= local_X[tid]
        end

    end

    return nothing
end

begin
    buf                 = default_buffer()
    NumberOfPoints      = 6195
    NumberOfInterations = 100000
    I           = rand(1:NumberOfPoints, NumberOfInterations)
    J           = rand(1:NumberOfPoints, NumberOfInterations)
    dρdtI       = zeros(NumberOfPoints) 
    drhopLp     = rand(1.0:2.0, NumberOfInterations)    
    drhopLn     = rand(1.0:2.0, NumberOfInterations)    

    NaiveReductionFunction!(dρdtI,I,J,drhopLp,drhopLn)
    println("Value when doing naive reduction: ", sum(dρdtI))
    dρdtI .= zero(eltype(dρdtI)); #Just resetting array.
    ReductionFunction!(dρdtI, I, J , drhopLp,drhopLn, buf)
    println("Value doing optimized reduction: ",  sum(dρdtI))

    println("Naive: ")
    display(@benchmark NaiveReductionFunction!($dρdtI, $I, $J , $drhopLp,drhopLn))
    println("Optimized: ")
    display(@benchmark ReductionFunction!($dρdtI, $I, $J , $drhopLp,drhopLn, $buf))
    println("I have threads enabled: ", nthreads())
end

And when I execute it these are the results:

image

Which shows that my implementation with threads is slower than single threaded. Can anyone see what I am doing wrong in this case?

I want to use Bumper.jl to avoid having to pass in “helper arrays” to store intermediate computations.

Kind regards

I might have been running in some older terminal and forgot to reset. These are the results I get now and added my try with ChunkSplitters:

using Base.Threads
using Bumper
using StrideArrays # Not necessary, but can make operations like broadcasting with Bumper.jl faster.
using Polyester
using BenchmarkTools
using LoopVectorization
using ChunkSplitters

function NaiveReductionFunction!(dρdtI, I,J,drhopLp,drhopLn)
    # Reduction
    for iter in eachindex(I,J)
        i = I[iter]
        j = J[iter]

        dρdtI[i] +=  drhopLp[iter]
        dρdtI[j] +=  drhopLn[iter]
    end
end

function ReductionFunction!(dρdtI,I,J,drhopLp,drhopLn, buf)
    # Set up a scope where memory may be allocated, and does not escape:
    @no_escape buf begin
        # Allocate a `PtrArray` (see StrideArraysCore.jl) using memory from the default buffer.
        XT = eltype(dρdtI); XL = length(dρdtI)

        # How can I do this so that I don't have to manually hard code to 4 threads?
        # If I do, local_X = [@alloc(XT,XL) for _ in 1:nthreads()] then it will allocate!
        local_X1 = @alloc(XT, XL)
        local_X2 = @alloc(XT, XL)
        local_X3 = @alloc(XT, XL)
        local_X4 = @alloc(XT, XL)
        local_X  = (local_X1, local_X2, local_X3, local_X4)

        # I thought Bumper.jl would automatically reset the buffer?
        for x in local_X
            x .*= zero(XT)
        end

        # Remove @batch here to verify the two functions give the same answer..
        # @batch seems to only use 1 thread, @tturbo uses all four?
        @batch for iter in eachindex(I,J)
            i = I[iter]
            j = J[iter]

            # Accumulate in thread-local storage
            local_X[threadid()][i] +=  drhopLp[iter]
            local_X[threadid()][j] +=  drhopLn[iter]
        end

        # Reduce the thread-local storage into the shared arrays
        for tid in 1:nthreads()
            dρdtI .+= local_X[tid]
        end

    end

    return nothing
end

function ChunkFunction!(dρdtI, I,J,drhopLp,drhopLn)
    nchunks = nthreads()
    local_X = [zeros(length(drhopLp)) for _ in 1:nchunks]
    @threads for (ichunk, inds) in enumerate(chunks(dρdtI; n=nchunks))
        i = I[inds]
        j = J[inds]

        @. local_X[ichunk][i] += drhopLp[i]
        @. local_X[ichunk][j] += drhopLn[j]
    end
end


begin
    buf                 = default_buffer()
    NumberOfPoints      = 6195
    NumberOfInterations = 100000
    I           = rand(1:NumberOfPoints, NumberOfInterations)
    J           = rand(1:NumberOfPoints, NumberOfInterations)
    dρdtI       = zeros(NumberOfPoints) 
    drhopLp     = rand(1.0:2.0, NumberOfInterations)    
    drhopLn     = rand(1.0:2.0, NumberOfInterations)    

    NaiveReductionFunction!(dρdtI,I,J,drhopLp,drhopLn)
    println("Value when doing naive reduction: ", sum(dρdtI))
    dρdtI .= zero(eltype(dρdtI)); #Just resetting array.
    ReductionFunction!(dρdtI, I, J, drhopLp,drhopLn, buf)
    println("Value doing optimized reduction: ",  sum(dρdtI))

    # dρdtI .= zero(eltype(dρdtI)); #Just resetting array.
    # dρdtI[I] .+= drhopLp[I]
    # dρdtI[J] .+= drhopLp[J]
    # println("Value doing fused: ",  sum(dρdtI))

    ChunkFunction!(dρdtI, I,J,drhopLp,drhopLn)
    println("Value doing chunks: ",  sum(dρdtI))


    println("Naive: ")
    display(@benchmark NaiveReductionFunction!($dρdtI , $I, $J, $drhopLp,drhopLn))
    println("Optimized: ")
    display(@benchmark ReductionFunction!($dρdtI , $I, $J, $drhopLp,drhopLn, $buf))
    println("Using ChunkSplitters: ")
    display(@benchmark ChunkFunction!($dρdtI , $I, $J , $drhopLp,drhopLn))
    println("I have threads enabled: ", nthreads())
end

Suggestions on how to improve speed or avoid the problem of manually writing local_X1,local_X2.. much appreciated.

I am the point where I have two solutions:

using Base.Threads
using Bumper
using StrideArrays # Not necessary, but can make operations like broadcasting with Bumper.jl faster.
using Polyester
using BenchmarkTools
using LoopVectorization
using ChunkSplitters

function NaiveReductionFunction!(dρdtI, I,J,drhopLp,drhopLn)
    # Reduction
    @inbounds for iter in eachindex(I,J)
        i = I[iter]
        j = J[iter]

        dρdtI[i] +=  drhopLp[iter]
        dρdtI[j] +=  drhopLn[iter]
    end
end

function ReductionFunctionChunk!(dρdtI, I, J, drhopLp, drhopLn)
    XT = eltype(dρdtI); XL = length(dρdtI); X0 = zero(XT)
    nchunks = nthreads()  # Assuming nchunks is defined somewhere as nthreads()

    @inbounds @no_escape begin
        local_X = @alloc(XT, XL, nchunks)

        fill!(local_X,X0)

        # Directly iterate over the chunks
        @batch for ichunk in 1:nchunks
            chunk_inds = getchunk(I, ichunk; n=nchunks)
            for idx in chunk_inds
                i = I[idx]
                j = J[idx]

                # Accumulate the contributions into the correct place
                local_X[i, ichunk] += drhopLp[idx]
                local_X[j, ichunk] += drhopLn[idx]
            end
        end

        # Reduction step
        @tturbo for ix in 1:XL
            for chunk in 1:nchunks
                dρdtI[ix] += local_X[ix, chunk]
            end
        end
    end
    
    return nothing
end


begin
    ProblemScaleFactor  = 1
    NumberOfPoints      = 6195*ProblemScaleFactor
    NumberOfInterations = 50000*ProblemScaleFactor
    I           = rand(1:NumberOfPoints, NumberOfInterations)
    J           = I; #rand(1:NumberOfPoints, NumberOfInterations)
    dρdtI       = zeros(NumberOfPoints) 
    drhopLp     = rand(NumberOfInterations)    
    drhopLn     = rand(NumberOfInterations)
    nchunks     = nthreads()
    local_X     = [zeros(length(dρdtI)) for _ in 1:nchunks]


    dρdtI .= zero(eltype(dρdtI))
    NaiveReductionFunction!(dρdtI,I,J,drhopLp,drhopLn)
    println("Value when doing naive reduction: ", sum(dρdtI))

    dρdtI .= zero(eltype(dρdtI))
    ReductionFunctionChunk!(dρdtI, I,J,drhopLp,drhopLn)
    println("Value when doing chunk reduction: ", sum(dρdtI))


    # Benchmark
    println("Naive function:")
    display(@benchmark NaiveReductionFunction!($dρdtI , $I, $J, $drhopLp, $drhopLn))

    println("Chunk function:")
    display(@benchmark ReductionFunctionChunk!($dρdtI , $I, $J, $drhopLp, $drhopLn))

end

One naive and a multithreaded version. The multithreaded is quite fast now, about 2x the speed. The nice part about using ChunkSplitters is that the final reduction can also be turbofied.

Value when doing naive reduction: 50099.85532789568
Value when doing chunk reduction: 50099.85532789568
Naive function:
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  67.200 μs … 146.100 μs  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     68.900 μs               ┊ GC (median):    0.00%        
 Time  (mean ± σ):   69.763 μs ±   4.467 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▆▂ █▅▄▃▃▃▂                                                   ▂
  ██▇█████████▇▆▇▆▅▆▄▆▄▄▁▁▄▃▄▇███▇▇▆▆▆▇▇▇▆▅▃▅▄▄▄▃▃▄▁▄▄▅▄▄▄▅▄▃▅ █
  67.2 μs       Histogram: log(frequency) by time      94.9 μs <

 Memory estimate: 0 bytes, allocs estimate: 0.
Chunk function:
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  28.000 μs … 823.500 μs  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     40.000 μs               ┊ GC (median):    0.00%
 Time  (mean ± σ):   39.512 μs ±  12.610 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

                          ▇ ▁  █
  █▇▂▁▁▂▂▁▁▁▅▇▂▁▂▂▂▂▂▂▃▃▂▆█▇██▆█▄▂▃▃▃▃▂▃▄▃▂▂▁▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁ ▂
  28 μs           Histogram: frequency by time           56 μs <

 Memory estimate: 0 bytes, allocs estimate: 0.

@Mason, sorry for pinging, but since you are the one behind Bumper.jl I wanted to inquire about if I am doing something wrong here, since my Buffer does not reset to zero at the end of the code? I have to use fill!(local_X, zero) to reset it. I feel I am following the rules of not using an @alloc storage outside of the @no_escape.

The reason it matters to me is that I can save 1/3rd of execution time if I remove the fill:

BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  67.400 μs … 429.900 μs  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     69.200 μs               ┊ GC (median):    0.00%
 Time  (mean ± σ):   70.730 μs ±   8.403 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▄  █▅▄▃▃▂▂▁▁▁                                                ▁
  ██▆████████████▇▇▇▇▆▆▆▆▅▅▆▆▆▆▆▆▅▅▅▆▆▅▅▅▅▅▃▅▅▅▃▃▄▃▃▄▄▃▄▃▅▄▄▃▄ █
  67.4 μs       Histogram: log(frequency) by time      96.6 μs <

 Memory estimate: 0 bytes, allocs estimate: 0.
Chunk function:
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  20.800 μs … 358.900 μs  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     34.300 μs               ┊ GC (median):    0.00%
 Time  (mean ± σ):   31.657 μs ±   9.887 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▅▇▂        ▃▆▆▃▁▁▂▂▂▂▂▃▃▄▃▁  ▁▁▁▃▇█▆▃   ▁  ▂▄▄▃▃▂▁           ▂
  ███▄▄▁▅▅▆▆▆█████████████████████████████████████████▇▇▇▇█▆▇▇ █
  20.8 μs       Histogram: log(frequency) by time      44.8 μs <

 Memory estimate: 0 bytes, allocs estimate: 0.

When I run the example on github:

It resets the buffer beautifully, am I breaking some rule?

Kind regards

Hey @Ahmed_Salih, sorry I don’t see any obvious ways to speed this up. The problem is really that there’s this awkward access pattern with the I and J vectors.

The reason you need to fill! the buffer is that you’re not just writing to the buffer, you’re doing += operations, so if you don’t explicitly zero out the contents of it, you’ll get junk values.

1 Like

Okay, but I thought I was hehe. When I do:

local_X = @alloc(XT, XL, nchunks)

Doesn’t it mean that it allocates local_X on the buffer? :blush:

And I write to it when I do:

                # Accumulate the contributions into the correct place
                local_X[i, ichunk] += drhopLp[idx]
                local_X[j, ichunk] += drhopLn[idx]

Atleast I think I do…

Kind regards

writing

local_X[i, ichunk] += drhopLp[idx]

is just shorthand for something like

local_X_i_ichunk = local_X[i, ichunk]
res = local_X_i_ichunk + drhopLp[idx]
local_X[i, ichunk] = res

so when you do that, you’re reading from local_X before you write to it.

1 Like

Thanks!

I think I got it.

Then I am going to use it just for the ability to be able to make temporary arrays without feeding them in before hand, which is awesome!

Just to explain the pattern of I and J, it is similar to NNlib.scatter!.

help?> NNlib.scatter!
  NNlib.scatter!(op, dst, src, idx)

  Scatter operation, which writes data in src into dst at locations idx. A binary reduction operator op is applied during the scatter. For each index k in idx, accumulates values in dst according to

  dst[:, ..., idx[k]...] = (op).(dst[:, ..., idx[k]...], src[:, ..., k...])

  See also scatter, gather.

  Arguments
  ≡≡≡≡≡≡≡≡≡

    •  op: Operations to be applied on dst and src, e.g. +, -, *, /, max, min and mean.

    •  dst: The destination for src to aggregate to. This argument will be mutated.

    •  src: The source data for aggregating.

    •  idx: The mapping for aggregation from source (index) to destination (value). The idx array can contain either integers or tuples.

  Examples
  ≡≡≡≡≡≡≡≡

  julia> NNlib.scatter!(+, ones(3), [10,100], [1,3])
  3-element Vector{Float64}:
    11.0
     1.0
   101.0

  julia> NNlib.scatter!(*, fill(0.5, 2, 4), [1 10; 100 1000], [3,2])
  2×4 Matrix{Float64}:
   0.5    5.0   0.5  0.5
   0.5  500.0  50.0  0.5

Where on CPU, the code I include above is about ~10x faster.

Kind regards

The ArrayAllocators package might help, it uses calloc to allocate zeroed memory, which can be faster than using fill!.

I am not sure I understand how to use it in this case?

I make the memory once and then reuse it multiple times.

Kind regards

He’s suggesting that you replace the @alloc call with

local_X = Array{XT}(calloc, XL, nchunks)

from ArrayAllocators.jl. I tried this though earlier and at least for the problem size you’re dealing with here, it’s way slower than using Bumper.

3 Likes

Got it, thanks! Did some tests and unfortunately no, did not speed up.

Thank you for the help everyone.

I think I am at the stage where I believe that this is the fastest it can get on single CPU, unless someone knows how to @tturbo the @batch loop:

        # Directly iterate over the chunks
        @batch for ichunk in 1:nchunks
            chunk_inds = getchunk(I, ichunk; n=nchunks)
            for idx in chunk_inds
                i = I[idx]
                j = J[idx]

                # Accumulate the contributions into the correct place
                local_X[i, ichunk] += drhopLp[idx]
                local_X[j, ichunk] += drhopLn[idx]
            end
        end

getchunk seems to error out when trying with threaded turbo.

Kind regards

1 Like