Is there a faster way to perform this reduction?

Hello!

I’ve wondered if there is faster/straight-forward way to improve the speed of this way of performing reduction?

using BenchmarkTools
using Random

# Define the function reduce_sum!
function reduce_sum!(target_array, arrays)
    for array in arrays
        target_array .+= array
    end
end

# Generate random arrays for benchmarking
function generate_random_arrays(num_arrays, array_size)
    arrays = [rand(Float64, array_size) for _ in 1:num_arrays]
    target_array = zeros(Float64, array_size)
    return target_array, arrays
end

# Setup benchmark for different sizes and number of arrays
function benchmark_reduce_sum()
    sizes = [100_000]  # different sizes of arrays
    nums = [8]       # different numbers of arrays to reduce

    results = Dict()
    for size in sizes
        for num in nums
            target_array, arrays = generate_random_arrays(num, size)
            bench = @benchmark reduce_sum!($target_array, $arrays)
            results[(size, num)] = bench
        end
    end
    return results
end

# Run the benchmark
results = benchmark_reduce_sum()

# Display results
for key in keys(results)
    println("Array size: ", key[1], ", Number of arrays: ", key[2])
    display(results[key])
end

Array size: 100000, Number of arrays: 8
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  254.700 μs …  1.368 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     275.200 μs              ┊ GC (median):    0.00%
 Time  (mean ± σ):   311.858 μs ± 86.791 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▅█▇▅▄▄▄▃▂▄▄▄▂▂▂▂▁▁▁▁   ▁                                     ▂
  ███████████████████████████████▇▇▆█▆▆▇▇▆▇▆▆▆▆▆▅▆▆▅▅▅▅▆▅▄▅▅▅▅ █
  255 μs        Histogram: log(frequency) by time       677 μs <

 Memory estimate: 0 bytes, allocs estimate: 0.

The reason I have 8 arrays to start with is that I have a multithreaded code and using ChunkSplitters.jl and at the end need to reduce them down to a single array in place.

Kind regards

I would try OhMyThreads.treduce.

I was not able to get that to work unfortunately. I have the following right now after chatting with our AI helpers:

using BenchmarkTools
using Random
using Base.Threads
using Test

# Original reduce_sum!
function reduce_sum!(target_array, arrays)
    for array in arrays
        target_array .+= array
    end
end

# Improved multithreaded reduce_sum
function reduce_sum_threaded!(target_array, arrays)
    n = length(target_array)
    num_threads = nthreads()
    chunk_size = ceil(Int, n / num_threads)
    @threads for t in 1:num_threads
        for j in 1:length(arrays)
            for i in (1 + (t-1)*chunk_size):min(t*chunk_size, n)
                @inbounds target_array[i] += arrays[j][i]
            end
        end
    end
end

# Tiled multithreaded reduce_sum
function reduce_sum_threaded_tiled!(target_array, arrays)
    tile_size = 256
    num_tiles = ceil(Int, length(target_array) / tile_size)
    num_threads = nthreads()
    
    @threads for t in 1:num_threads
        for tile in 1:num_tiles
            start_idx = (tile - 1) * tile_size + 1
            end_idx = min(tile * tile_size, length(target_array))
            
            for j in 1:length(arrays)
                for i in start_idx:end_idx
                    @inbounds target_array[i] += arrays[j][i]
                end
            end
        end
    end
end

# Generate random arrays for benchmarking
function generate_random_arrays(num_arrays, array_size)
    arrays = [rand(Float64, array_size) for _ in 1:num_arrays]
    target_array = zeros(Float64, array_size)
    return target_array, arrays
end

# Setup benchmark for different sizes and number of arrays
function benchmark_reduce_sum()
    sizes = [100_000]  # different sizes of arrays
    nums = [8]         # different numbers of arrays to reduce

    results = Dict()
    for size in sizes
        for num in nums
            target_array, arrays = generate_random_arrays(num, size)
            bench_original = @benchmark reduce_sum!($target_array, $arrays)
            bench_threaded = @benchmark reduce_sum_threaded!($target_array, $arrays)
            bench_threaded_tiled = @benchmark reduce_sum_threaded_tiled!($target_array, $arrays)
            results[(size, num, "Original")] = bench_original
            results[(size, num, "Threaded")] = bench_threaded
            results[(size, num, "ThreadedTiled")] = bench_threaded_tiled
        end
    end
    return results
end

# Function to test correctness of the multithreaded approach
function test_correctness()
    array_size = 100_000
    num_arrays = 8
    target_array, arrays = generate_random_arrays(num_arrays, array_size)
    correct_result = copy(target_array)
    reduce_sum!(correct_result, arrays)

    threaded_result = copy(target_array)
    reduce_sum_threaded!(threaded_result, arrays)

    threaded_tiled_result = copy(target_array)
    reduce_sum_threaded_tiled!(threaded_tiled_result, arrays)

    @test all(@. threaded_tiled_result == threaded_result == correct_result)
end

# Run the benchmark
results = benchmark_reduce_sum()

# Display results
for key in keys(results)
    println("Array size: ", key[1], ", Number of arrays: ", key[2], ", Version: ", key[3])
    display(results[key])
end


# Run tests
test_correctness()


Which gives:

Array size: 100000, Number of arrays: 8, Version: Original
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  264.200 μs …  17.460 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     329.700 μs               ┊ GC (median):    0.00%
 Time  (mean ± σ):   473.223 μs ± 551.163 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  █▇▆▅▅▄▄▃▃▃▃▂▂▂▂▁▁▁                                            ▂
  ██████████████████████▇█▇▇▇▇▇▆▆▇▆▆▅▇▆▆▅▆▅▅▄▆▄▅▅▄▄▆▃▅▅▅▄▄▁▅▄▄▅ █
  264 μs        Histogram: log(frequency) by time       2.64 ms <

 Memory estimate: 0 bytes, allocs estimate: 0.
Array size: 100000, Number of arrays: 8, Version: Threaded
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):   70.700 μs …   4.343 ms  ┊ GC (min … max): 0.00% … 88.09%
 Time  (median):      99.800 μs               ┊ GC (median):    0.00%
 Time  (mean ± σ):   136.432 μs ± 128.146 μs  ┊ GC (mean ± σ):  0.28% ±  0.88%

  █▆▆▆▆▄▄▄▃▃▃▂▂▂▁▂▁▂▁▁▁▁▁▁ ▁                                    ▂
  ███████████████████████████████▇▇▇▇▇▆▆▇▆▇▆▆▆▆▆▅▅▅▅▅▅▅▅▄▄▂▄▄▄▄ █
  70.7 μs       Histogram: log(frequency) by time        579 μs <

 Memory estimate: 5.11 KiB, allocs estimate: 41.
Array size: 100000, Number of arrays: 8, Version: ThreadedTiled
BenchmarkTools.Trial: 5875 samples with 1 evaluation.
 Range (min … max):  344.800 μs …   3.214 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     752.600 μs               ┊ GC (median):    0.00%
 Time  (mean ± σ):   836.870 μs ± 364.829 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

      ▃▆█▇▆▂▂▂▃▅▄▃▁
  ▁▂▃▆██████████████▇▆▆▆▅▅▄▄▄▃▃▃▃▃▂▂▂▂▂▂▂▂▂▂▂▁▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁ ▃
  345 μs           Histogram: frequency by time         2.24 ms <

 Memory estimate: 5.11 KiB, allocs estimate: 41.

So the threaded is 4x faster now, which is nice, wondering if there are more functional speed ups :slight_smile:

The fastest implementation I’ve found for now is:


function reduce_sum!(target_array, arrays)
    n = length(target_array)
    num_threads = nthreads()
    chunk_size = ceil(Int, n / num_threads)
    @inbounds @threads for t in 1:num_threads
        local start_idx = 1 + (t-1) * chunk_size
        local end_idx = min(t * chunk_size, n)
        for j in eachindex(arrays)
            local array = arrays[j]  # Access array only once per thread
            @simd for i in start_idx:end_idx
                @inbounds target_array[i] += array[i]
            end
        end
    end
end

4x faster than serial, in actual code it might not be 4x speed gain, but should be better than serial.

1 Like

I guess arrays was computed in parallel before. One alternative is to increment the reduced output in that same parallel loop. Sometimes this requires locking the output, but can still be faster than an independent reducer that has to wait all threads to finish.