Parallel reductions

“Parallel mapreduce” is a really common need, and it’s surprisingly hard to find a Julia implementation.

Say we have

julia> x = randn(10000000);

julia> f = log∘sqrt∘abs
#56 (generic function with 1 method)

The goal is to build a fast, parallel work-alike to

julia> @btime sum($f.($x))
  150.037 ms (2 allocations: 76.29 MiB)
-3.1741049251367934e6

julia> @btime mapreduce($f,$+,$x)
  135.280 ms (0 allocations: 0 bytes)
-3.1741049251367934e6

When I asked this on Julia Slack #multithreading, @atthom and @ChrisRackauckas suggested something like

function distributed_mapreduce(f,op,x) 
    @distributed op for i in x
        f(i)
    end
end

julia> @btime distributed_mapreduce($f, $+, $x)
  164.202 ms (32 allocations: 76.30 MiB)
-3.174104925136724e6

The original question was just “parallel sum”, and at the time I hadn’t tested this particular example. But clearly this is not great.

I wanted to implement something like the approach taken in OpenMP:

  1. Give each thread its own accumulator
  2. Partition the data
  3. mapreduce each element of the partition
  4. Aggregate the result and return

So here’s what I came up with:

julia> function mymapreduce(f, op, x) 
           s = zeros(Threads.nthreads())
           @threads for i in eachindex(x)
               @inbounds s[Threads.threadid()] = op(f(x[i]), s[Threads.threadid()])
           end
           foldl(op, s)
       end
mymapreduce (generic function with 1 method)

julia> @btime mymapreduce($f, $+, $x)
  47.576 ms (59 allocations: 6.13 KiB)
-3.1741049251368097e6

@vchuravy suggested there’s probably false sharing in this approach, and pointed me to some slides that had an approach like this:

julia> function threaded_mapreduce(f, op, x)
          @assert length(x) % nthreads() == 0
          results = zeros(eltype(x), nthreads())
          @threads for tid in 1:nthreads()
              # split work
              acc = zero(eltype(x))
              len = div(length(x), nthreads())
              domain = ((tid-1)*len +1):tid*len
              acc = op(acc, mapreduce(f, op, view(x, domain)))
              results[tid] = acc
          end
          foldl(op, results)
       end
threaded_mapreduce (generic function with 1 method)

julia> @btime threaded_mapreduce($f, $+, $x)
  28.211 ms (69 allocations: 6.56 KiB)
-3.1741049251367934e6

We’re getting there! But…

  • This requires the domain to be a multiple of the number of threads, so we’ll need to generalize it a bit
  • We still have lots of allocations. OpenMP uses thread-local values on the stack (if I’m understanding it correctly), but we’re still heap-allocating an array. And though my nthreads() == 8, we’re somehow still allocating over 6 KB in 69 allocations, so something weird is going on.

Any ideas? @Elrod any chance you have a nice implementation of this lying around?

16 Likes

Your last example is close to how I implement reduce in Transducers.jl for Julia < 1.3 while I switched to use divide and conquer approach for Julia 1.3 to support early termination (find-first). In master there is also a Distributed.jl-based version for using non- thread-safe functions. I haven’t done any serious optimization yet, as I’m assuming that performance characteristics of threading in Julia will be changed/improved after 1.3. But I noticed terminatable @spawn-based reduce is much slower than @threads-based reduce in your example so I guess a low-hanging fruit for Transducers.jl may be to use the latter implementation depending on the functions passed (or just add a keyword argument to switch the implementation).

I’m curious to know how much does it matter. I suppose length(x) is much larger than nthreads(). So, wouldn’t it be negligible compared to the inner mapreduce?

3 Likes

Hard to tell how much it matters. There are two things that are surprising here:

  1. OpenMP can very easily express the concept of thread-local stack, while it’s not at all clear how to do this in Julia. This seems like something worth addressing for general expressiveness, even if the cost in this example is low. Also, is the cost here low? Only way to know for sure is to compare directly with an OpenMP implementation.
  2. I would expect a one-time allocation of
    (8 bytes/Float64)* (1 Float64/thread) * (8 threads) == 64 bytes
    Why is the allocation so much greater than this?
2 Likes

You can use the package Strided.jl for this — on my laptop:

julia> @btime threaded_mapreduce($f, $+, $x)
  55.410 ms (37 allocations: 3.14 KiB)
-3.1740390429216223e6

julia> using Strided

julia> @btime @strided sum($f, $x)
  52.695 ms (65 allocations: 4.14 KiB)
-3.1740390429214886e6

julia> @btime sum($f, $x) # same speed as mapreduce
  143.710 ms (0 allocations: 0 bytes)
-3.1740390429216227e6
7 Likes

Wrt 2., the Multithreading blogpost states that each thread is given a 4MB stack so as to conservatively not run out of stack space. However, most of it is virtual memory, and not physically in demand.

2 Likes

Here is a quick sanity check to measure how much time is spent in the inner mapreduce:

const time_before = zeros(Float64, nthreads())
const time_after = zeros(Float64, nthreads())

function threaded_mapreduce2(f, op, x)
    @assert length(x) % nthreads() == 0
    results = zeros(eltype(x), nthreads())
    @threads for tid in 1:nthreads()
        # split work
        acc = zero(eltype(x))
        len = div(length(x), nthreads())
        domain = ((tid-1)*len +1):tid*len
        time_before[tid] = time()
        acc = op(acc, mapreduce(f, op, view(x, domain)))
        time_after[tid] = time()
        results[tid] = acc
    end
    foldl(op, results)
end

x = randn(10000000)
f = log∘sqrt∘abs
threaded_mapreduce(f, +, x)
threaded_mapreduce2(f, +, x)

@time threaded_mapreduce(f, +, x)
@time threaded_mapreduce2(f, +, x)
@show maximum(time_after .- time_before)

which prints

  0.037959 seconds (43 allocations: 3.562 KiB)
  0.039318 seconds (43 allocations: 3.562 KiB)
maximum(time_after .- time_before) = 0.039275169372558594

So it seems that it’s correct to assume that the longest inner mapreduce directly determines the overall timing.

Note that it looks like adding times does not change the performance:

julia> @btime threaded_mapreduce($f, +, $x);
  36.247 ms (36 allocations: 3.36 KiB)

julia> @btime threaded_mapreduce2($f, +, $x);
  36.165 ms (36 allocations: 3.36 KiB)
2 Likes

SIMDPirates.alloca should allocate (up to 4MB) on the local stack. It return a pointer that will be invalidated once you leave the function SIMDPirates.alloca gets inlined into.

First, a silly demo of alloca, because a temporary is completely unnecessary here:

julia> using SIMDPirates, PaddedMatrices, LinearAlgebra

julia> using PaddedMatrices: AbstractFixedSizeVector

julia> a = @Mutable randn(256);

julia> b = @Mutable randn(256);

julia> c = similar(a); typeof(c)
FixedSizeArray{Tuple{256},Float64,1,Tuple{1},256}

julia> function mydot!(c::AbstractFixedSizeVector{L,T,P}, a::AbstractFixedSizeVector{L,T,P}, b::AbstractFixedSizeVector{L,T,P}) where {L,T,P}
           @inbounds @simd for p ∈ 1:P
               c[p] = a[p] * b[p]
           end
           sum(c)
       end
mydot! (generic function with 1 method)

julia> function mydot(a::AbstractFixedSizeVector{L,T,P}, b::AbstractFixedSizeVector{L,T,P}) where {L,T,P}
           ptr_c = SIMDPirates.alloca(Val(P), T)
           c = PtrVector{L,T,P}(ptr_c)
           mydot!(c, a, b)
       end
mydot (generic function with 1 method)

julia> using BenchmarkTools

julia> @btime mydot($a,$b)
  38.355 ns (0 allocations: 0 bytes)
5.992827584848622

julia> @btime mydot!($c,$a,$b)
  38.183 ns (0 allocations: 0 bytes)
5.992827584848622

julia> @btime $a' * $b # not storing in a temporary is much faster; memory loads are the bottle neck
  20.690 ns (0 allocations: 0 bytes)
5.9928275848486265

Much slower than not having a temporary at all, but just as fast as using a pre-allocated temporary. It is not allowed to leave the function.
I’m curious what Yuyichao would have to say about using alloca; is it safe within the function? Julia’s GC shouldn’t see or touch this memory, but maybe that means it’s at risk of allocating memory itself and running into this 4 MB limit.

Now, hitting the 4MB limit at 524_288 doubles allocated, but not 500_000 allocated:

julia> L = 1 << 19
524288

julia> a = @Mutable randn(L);

julia> b = @Mutable randn(L);

julia> c = similar(a); typeof(c)
FixedSizeArray{Tuple{524288},Float64,1,Tuple{1},524288}

julia> @btime mydot!($c,$a,$b)
  654.137 μs (0 allocations: 0 bytes)
594.3816735131803

julia> @btime $a' * $b # not storing in a temporary is much faster; memory loads are the bottle neck
  307.433 μs (0 allocations: 0 bytes)
594.3816735131807

julia> @btime mydot($a,$b)
ERROR: ReadOnlyMemoryError()
Stacktrace:
 [1] macro expansion [inlined]
 [2] store! at /home/chriselrod/.julia/packages/VectorizationBase/aAHcj/src/vectorizable.jl:60 [inlined]
 [3] macro expansion at /home/chriselrod/.julia/packages/PaddedMatrices/OrV46/src/mutable_fs_padded_array.jl:357 [inlined]
 [4] setindex! at /home/chriselrod/.julia/packages/PaddedMatrices/OrV46/src/mutable_fs_padded_array.jl:344 [inlined]
 [5] macro expansion at ./REPL[6]:3 [inlined]
 [6] macro expansion at ./simdloop.jl:77 [inlined]
 [7] mydot!(::PtrArray{Tuple{524288},Float64,1,Tuple{1},524288,false}, ::FixedSizeArray{Tuple{524288},Float64,1,Tuple{1},524288}, ::FixedSizeArray{Tuple{524288},Float64,1,Tuple{1},524288}) at ./REPL[6]:2
 [8] mydot [inlined]
 [9] ##core#606(::FixedSizeArray{Tuple{524288},Float64,1,Tuple{1},524288}, ::FixedSizeArray{Tuple{524288},Float64,1,Tuple{1},524288}) at /home/chriselrod/.julia/packages/BenchmarkTools/7aqwe/src/execution.jl:297
 [10] ##sample#607(::BenchmarkTools.Parameters)
 [11] _run(::BenchmarkTools.Benchmark{Symbol("##benchmark#605")}, ::BenchmarkTools.Parameters; verbose::Bool, pad::String, kwargs::Base.Iterators.Pairs{Symbol,Integer,NTuple{4,Symbol},NamedTuple{(:samples, :evals, :gctrial, :gcsample),Tuple{Int64,Int64,Bool,Bool}}})
 [12] (::Base.var"#inner#2"{Base.Iterators.Pairs{Symbol,Integer,NTuple{5,Symbol},NamedTuple{(:verbose, :samples, :evals, :gctrial, :gcsample),Tuple{Bool,Int64,Int64,Bool,Bool}}},typeof(BenchmarkTools._run),Tuple{BenchmarkTools.Benchmark{Symbol("##benchmark#605")},BenchmarkTools.Parameters}})() at ./essentials.jl:714
 [13] #invokelatest#1 [inlined]
 [14] #run_result#37 at /home/chriselrod/.julia/packages/BenchmarkTools/7aqwe/src/execution.jl:32 [inlined]
 [15] run(::BenchmarkTools.Benchmark{Symbol("##benchmark#605")}, ::BenchmarkTools.Parameters; kwargs::Base.Iterators.Pairs{Symbol,Integer,NTuple{5,Symbol},NamedTuple{(:verbose, :samples, :evals, :gctrial, :gcsample),Tuple{Bool,Int64,Int64,Bool,Bool}}}) at /home/chriselrod/.julia/packages/BenchmarkTools/7aqwe/src/execution.jl:46
 [16] #warmup#42 at /home/chriselrod/.julia/packages/BenchmarkTools/7aqwe/src/execution.jl:79 [inlined]
 [17] warmup(::BenchmarkTools.Benchmark{Symbol("##benchmark#605")}) at /home/chriselrod/.julia/packages/BenchmarkTools/7aqwe/src/execution.jl:79
 [18] top-level scope at /home/chriselrod/.julia/packages/BenchmarkTools/7aqwe/src/execution.jl:390

julia> L = 500_000
500000

julia> a = @Mutable randn(L);

julia> b = @Mutable randn(L);

julia> c = similar(a); typeof(c)
FixedSizeArray{Tuple{500000},Float64,1,Tuple{1},500000}

julia> @btime mydot!($c,$a,$b)
  588.563 μs (0 allocations: 0 bytes)
-160.2021556040956

julia> @btime $a' * $b # not storing in a temporary is much faster; memory loads are the bottle neck
  296.243 μs (0 allocations: 0 bytes)
-160.2021556040961

julia> @btime mydot($a,$b)
  590.921 μs (0 allocations: 0 bytes)
-160.2021556040956

You should be determining the domain with:

start = 1 + ((tid - 1) * length(x)) ÷ nthreads()
stop = (tid * length(x)) ÷ nthreads()
domain = start:stop

This requires one extra ÷, but now you don’t need length(x) to be an integer multiple of nthreads().

Have you considered trying atomic_add! for the reduction, instead of an array of length equal to the number of threads?

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

I believe @threads by itself results in a lot of allocations, so that you’d see more that 64 bytes.
From this fantastic blog post:

julia> import Base.Threads.@spawn

julia> function fib(n::Int)
           if n < 2
               return n
           end
           t = @spawn fib(n - 2)
           return fib(n - 1) + fetch(t)
       end
fib (generic function with 1 method)

julia> @btime fib(15)
  981.306 μs (7512 allocations: 733.63 KiB)
610
2 Likes

My actual use case involves this type:

struct For{F,N,X} 
    f :: F  
    θ :: NTuple{N,Int}
end

This is a way to build general array-valued distributions; the type parameters are

  • X gives the sample space for each element of the array
  • N gives its rank
  • F is the type of the function mapping coordinates to a distribution

For example (no pun intended),

julia> d = For(10000000) do j
       Normal(1/j,1.0)
       end
For{var"#153#154",1,Float64}(var"#153#154"(), (10000000,))

The reduction comes in computing logpdf:

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s = 0.0
    for θ in CartesianIndices(d.θ)
        @inbounds s += logpdf(d.f(Tuple(θ)...), xs[θ])
    end
    s
end

julia> x = rand(d);

julia> @btime logpdf($d,$x)
  65.662 ms (0 allocations: 0 bytes)
-1.4190706223203065e7

I don’t really understand when atomic operations make sense. They’re like very lightweight locks, right? I think we’re seeing memory contention in this case:

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s = Threads.Atomic{Float64}(0.0)
    for θ in CartesianIndices(d.θ)
        @inbounds Threads.atomic_add!(s, logpdf(d.f(Tuple(θ)...), xs[θ]))
    end
    s.value
end

julia> @btime logpdf($d,$x)
  87.053 ms (1 allocation: 16 bytes)
-1.4190706223203065e7

Here’s my current best:

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s = zeros(Float64, Threads.nthreads())
    @threads for θ in CartesianIndices(d.θ)
        tid = Threads.threadid()
        @inbounds s[tid] += logpdf(d.f(Tuple(θ)...), xs[θ])
    end
    sum(s)
end

julia> @btime logpdf($d,$x)
  27.363 ms (227 allocations: 24.14 KiB)
-1.4190706223201526e7

I had thought it might be better to allocate a constant at the top-level and re-use it for each reduction. I’m surprised it’s not:

const s =  zeros(Float64, Threads.nthreads())

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s .= 0.0
    @threads for θ in CartesianIndices(d.θ)
        tid = Threads.threadid()
        @inbounds s[tid] += logpdf(d.f(Tuple(θ)...), xs[θ])
    end
    sum(s)
end

julia> @btime logpdf($d,$x)
  28.936 ms (228 allocations: 23.83 KiB)
-1.4188093456982236e7

How would I do this with SIMDPirates et al?

1 Like

This seems much faster. @Elrod is this how you would use atomic_add!?

@inline function logpdf2(d::For{F,N,X1},xs) where {F,N, X1, X2}
    results = zeros(eltype(xs), nthreads())

    θ = CartesianIndices(d.θ)

    total = Threads.Atomic{Float64}(0.0)
    @threads for tid in 1:nthreads()
        # split work
        start = 1 + ((tid - 1) * length(xs)) ÷ nthreads()
        stop = (tid * length(xs)) ÷ nthreads()
        domain = start:stop
        
        s = 0.0
        for j in domain
            @inbounds θj = θ[j]
            @inbounds s += logpdf(d.f(Tuple(θj)...), xs[θj])
        end

        Threads.atomic_add!(total, s)
    end

    total.value
end

EDIT: Wow, yep this is way faster:

julia> @btime logpdf($d,$x)
  28.783 ms (227 allocations: 24.14 KiB)
-1.4190676435090607e7

julia> @btime logpdf2($d,$x)
  4.004 ms (230 allocations: 24.20 KiB)
-1.4190676435090605e7
2 Likes

Is it faster than your threaded_mapreduce in the OP? I find it counter-intuitive that results = zeros(eltype(x), nthreads()) would be so much of a overhead compared to Threads.Atomic{Float64}(0.0) in logpdf2.

I’d expect the difference between threaded_mapreduce from his opening post and the atomic version to be small.
I’d expect not having to perform the reduction at the end to be a bigger benefit. As you noted, runtime is mostly determined by the runtime of the slowest thread. The faster threads perform the sum while the slowest thread finishes. Which will save like 5 nanoseconds…

While the threaded logpdf using const s = zeros(Float64, Threads.nthreads()) is going to be very slow, because it suffers from false sharing. Each number should be in a separate cacheline if you’re doing repeated writes (on most computers, a cache-line is 64 bytes, ie, 8 Float64).

@cscherrer

How would I do this with SIMDPirates et al?

I would make sure the code is SIMD. Currently, that doesn’t seem to be the case:

julia> @code_llvm optimize=true logpdf(d.f(3), x[3])

;  @ /home/chriselrod/.julia/packages/Distributions/0Wogo/src/univariates.jl:531 within `logpdf'
define double @julia_logpdf_17853([2 x double] addrspace(11)* nocapture nonnull readonly dereferenceable(16), double) {
top:
; ┌ @ Base.jl:33 within `getproperty'
   %2 = getelementptr inbounds [2 x double], [2 x double] addrspace(11)* %0, i64 0, i64 0
   %3 = getelementptr inbounds [2 x double], [2 x double] addrspace(11)* %0, i64 0, i64 1
; └
  %4 = load double, double addrspace(11)* %2, align 8
  %5 = load double, double addrspace(11)* %3, align 8
  %6 = call double @julia_normlogpdf_17682(double %4, double %5, double %1)
  ret double %6
}

The fact that @julia_normlogpdf_17682 doesn’t get inlined will prevent vectorization. That’s why I’d recommend using another library if you prioritize performance. If the standard deviation parameter is a vector, you’d also need a SIMD log function, such as from SLEEFPirates.

One concern though is that if the standard deviation is a scalar, Base.log is both faster and more accurate. Therefore, which function I prefer is dependent on whether the standard deviation is a scalar or a vector. If it is a constant, you should be able to just drop the log altogether (although some methods like Bayesian model averaging will still need it).

julia> using SLEEFPirates, BenchmarkTools

julia> x = rand(200); y = similar(x);

julia> @benchmark $y .= SLEEFPirates.log.($x)
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     312.908 ns (0.00% GC)
  median time:      321.531 ns (0.00% GC)
  mean time:        323.462 ns (0.00% GC)
  maximum time:     2.338 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     239

julia> @benchmark $y .= Base.log.($x)
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     984.500 ns (0.00% GC)
  median time:      1.018 μs (0.00% GC)
  mean time:        1.020 μs (0.00% GC)
  maximum time:     3.356 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     14

julia> @benchmark SLEEFPirates.log(first($x))
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     10.075 ns (0.00% GC)
  median time:      10.159 ns (0.00% GC)
  mean time:        10.345 ns (0.00% GC)
  maximum time:     30.467 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     999

julia> @benchmark Base.log(first($x))
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     4.945 ns (0.00% GC)
  median time:      5.052 ns (0.00% GC)
  mean time:        5.070 ns (0.00% GC)
  maximum time:     21.618 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1000

So you could define some sort of interface for picking between distributions you loop over based on container types, to be able to select the optimal version.

In this example though, you just have a normal with mean 1/j, so I’ll just ignore the logarithm.

struct NormalUnitVar{T}
    mean::T
end
const LOG2PI = log(2π)
@inline Distributions.logpdf(n::NormalUnitVar, x) = @fastmath -0.5 * abs2(n.mean - x) - 0.5LOG2PI
d2 = For((10000000,)) do j
    NormalUnitVar(1/j)
end
@inline function logpdf2(d::For{F,N,X1},xs) where {F,N, X1, X2}
    results = zeros(eltype(xs), nthreads())
    θ = CartesianIndices(d.θ)
    total = Threads.Atomic{Float64}(0.0)
    @threads for tid in 1:nthreads()
        # split work
        start = 1 + ((tid - 1) * length(xs)) ÷ nthreads()
        stop = (tid * length(xs)) ÷ nthreads()
        domain = start:stop
        s = 0.0
        @simd ivdep for j in domain
            @inbounds θj = θ[j]
            @inbounds s += logpdf(d.f(Tuple(θj)...), xs[θj])
        end
       
        Threads.atomic_add!(total, s)
    end
       
    total.value
end

Benchmarks of original d vs d2 using NormalUnitVar:

julia> x = rand(d.θ...);

julia> @btime logpdf2($d,$x) # original
  8.164 ms (76 allocations: 7.70 KiB)
-1.0856831976798426e7

julia> @btime logpdf2($d2,$x) #d2 using NormalUnitVar
  983.862 μs (76 allocations: 7.70 KiB)
-1.0856831976798521e7
2 Likes

This reminded me that, while mapreduce uses @simd for arrays, it does “pairwise accumulation” and so it could be slow even compared to hand-rolled straightforward loop. It would be nice if mapfoldl can be used in a situation like this (it is something I want to propose after/if Transducer as an optimization: map, filter and flatten by tkf · Pull Request #33526 · JuliaLang/julia · GitHub is merged). Of course, it doesn’t replace hand-tuned functions invoking SIMD directly, though.

By the way, on top of false sharing, I think it also has a correctness problem.

My understanding is that this s .= 0.0 itself is not thread-safe; i.e., it is an “undefined” behavior when there are multiple parallel calls to logpdf itself. I think you actually need const ss = Vector{Vector{Float64}} (and initialize it in __init__, like Random does it). Then

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1, X2 <: X1}
    s = ss[Threads.threadid()]
    s .= 0.0

I still think local result array is an OK direction, though.

In v1.3 Task+scheduler overhead is several thousand cycles per switch so one needs coarse task-granularity. Did you try large basesize? That should make divide-and-conquer competitive for these large problems.

1 Like

Oops, guess I hadn’t compared that one directly. Yes, the atomic_add version is a bit faster. Going back to generic mapreduce workalikes,

function threaded_mapreduce(f, op, x)
    @assert length(x) % nthreads() == 0
    results = zeros(eltype(x), nthreads())
    @threads for tid in 1:nthreads()
        # split work
        start = 1 + ((tid - 1) * length(x)) ÷ nthreads()
        stop = (tid * length(x)) ÷ nthreads()
        domain = start:stop
                
        acc = zero(eltype(x))
        acc = op(acc, mapreduce(f, op, view(x, domain)))
        results[tid] = acc
    end
    foldl(op, results)
end

function atomic_mapreduce(f,op,x)
    results = zeros(eltype(x), nthreads())

    total = Threads.Atomic{Float64}(0.0)
    @threads for tid in 1:nthreads()
        # split work
        start = 1 + ((tid - 1) * length(x)) ÷ nthreads()
        stop = (tid * length(x)) ÷ nthreads()
        domain = start:stop
        
        acc = zero(eltype(x))
        for j in domain
            @inbounds acc = op(acc, f(x[j]))
        end

        Threads.atomic_add!(total, acc)
    end

    total.value
end

Then benchmarking…

julia> x = randn(10000000);

julia> f = log∘sqrt∘abs;

julia> @benchmark threaded_mapreduce(f, +, x)
BenchmarkTools.Trial: 
  memory estimate:  25.69 KiB
  allocs estimate:  261
  --------------
  minimum time:     5.547 ms (0.00% GC)
  median time:      6.239 ms (0.00% GC)
  mean time:        8.629 ms (0.00% GC)
  maximum time:     35.521 ms (0.00% GC)
  --------------
  samples:          578
  evals/sample:     1

julia> @benchmark atomic_mapreduce(f, +, x)
BenchmarkTools.Trial: 
  memory estimate:  24.17 KiB
  allocs estimate:  229
  --------------
  minimum time:     5.422 ms (0.00% GC)
  median time:      5.872 ms (0.00% GC)
  mean time:        6.602 ms (0.00% GC)
  maximum time:     13.920 ms (0.00% GC)
  --------------
  samples:          756
  evals/sample:     1

Is this the idea behind PaddedArrays?

Is this because it calls to non-Julia code? I’ve seen discussion about rebuilding all distributional computations in Julia, this seems like a compelling argument for this to be a priority.

Some of this (vector vs scalar) seems simple enough using multiple dispatch. As for “log of constant” concerns, I’ve come to like the idea of keeping these but moving them outside any loop, so it’s a one-time cost. I think this ought to give us the best of both worlds - accurate logpdfs with very small O(1) overhead.

Wow, this is really nice. I’m a bit concerned about your use of @fastmath. Do you know anything in this case about accuracy? It also makes me wonder if there are tricks we could use, like using @fastmath to somehow build a proposal distribution, and correct for it later. Not sure if anything like this has been done.

Yep, definitely. It’s written here for one-time-use, I had thought about something like a lock around all of s, so only one logpdf(::For,x) is computed at a time. I’m not familiar with the approach you describe, I’ll need to have a look.

I’ve seen references to __init__ in other Julia libraries, but in my experience it’s strictly a Python thing. Where does this come up in Julia?

Probably off topic there (because foldl is explicit about the order), but I commented there to note that summing missing values is slower than using a BitArray to conditionally sum a concretely typed Array.

Based on the speed of sum vs a naive loop, I don’t think pairwise accumulation loses much in the speed department.

You’re right that in some cases, pairwise summation can lose a lot in speed vs a naive SIMD loop:

julia> x = rand(256);

julia> function mysum(x::AbstractArray{T}) where {T}
           s = zero(T)
           @simd for xᵢ ∈ x
               s += xᵢ
           end
           s
       end
mysum (generic function with 1 method)

julia> @btime mysum($x)
  9.890 ns (0 allocations: 0 bytes)
130.05195874127304

julia> @btime sum($x)
  35.376 ns (0 allocations: 0 bytes)
130.051958741273

Note that the naive loop is unrolled by the compiler into 32 separate accumulators on my computer, so that the loop of length 256 is reduced to only 8 iterations (8*32 = 256).
In a sense, this is also a pairwise summation with a stride of 32 between paired sums.

@cscherrer

Is this the idea behind PaddedArrays ?

The idea is similar.
I also want load operations to be aligned. Recent Intel and AMD CPUs are limited to 2 loads/cycle. If a vector load crosses a 64 byte boundary, it counts as two loads against this limit. Most of these CPUs also have 2 adds or fmas / cycle, so when you look at a lot of common functions like a dot product where you have 2 loads per fma, you’ll see that this loading is actually often the bottle neck on performance (this is a different problem from memory bandwidth as more commonly discussed with dot products, where memory can’t move to the highest cache levels fast enough; this is a problem of moving memory from cache into registers, so it’s a problem no matter how small the arrays, unless the compiler was able to pass the data in registers as can happen).

So it’s nice to align your loads (even if you’re using vmovupd, unaligned loads).

Further more, it saves you from having to add a serial or masked loop to the end of a vectorized loop to catch the remainder.

Is this because it calls to non-Julia code? I’ve seen discussion about rebuilding all distributional computations in Julia, this seems like a compelling argument for this to be a priority.

It could also just be code that didn’t get inlined.

julia> using Distributions

julia> n = Normal()
Normal{Float64}(μ=0.0, σ=1.0)

julia> @code_warntype logpdf(n, 0.67)
Variables
  #self#::Core.Compiler.Const(Distributions.logpdf, false)
  d::Normal{Float64}
  x::Float64

Body::Float64
1 ─ %1 = Base.getproperty(d, :μ)::Float64
│   %2 = Base.getproperty(d, :σ)::Float64
│   %3 = Distributions.normlogpdf(%1, %2, x)::Float64
└──      return %3

julia> @which Distributions.normlogpdf(n.μ, n.σ, 0.67)
normlogpdf(μ::Real, σ::Real, x::Number) in StatsFuns at /home/chriselrod/.julia/packages/StatsFuns/2QE7p/src/distrs/norm.jl:31

Which is already written in Julia, as you can see here.

Some of this (vector vs scalar) seems simple enough using multiple dispatch. As for “log of constant” concerns, I’ve come to like the idea of keeping these but moving them outside any loop, so it’s a one-time cost. I think this ought to give us the best of both worlds - accurate logpdfs with very small O(1) overhead.

I prefer O(0) and reduced numerical error, so I drop them when I can ;). But if it’s a scalar, I do pull it out of the loop and just do length(y) * Base.log(σ).
Similarly, I drop log(2pi).
But I will add support for returning the constant parts soon, because there’s a Bayesian Model Averaging library used internally at work I’d like to use my Julia code for. But then it only has to be calculated once per data set, instead of once per time the density is evaluated (which could be a hundred times per MCMC sample, and for thousands of MCMC samples)

Wow, this is really nice. I’m a bit concerned about your use of @fastmath . Do you know anything in this case about accuracy? It also makes me wonder if there are tricks we could use, like using @fastmath to somehow build a proposal distribution, and correct for it later. Not sure if anything like this has been done.

I think @fastmath will generally be more accurate than foldl, foldr, or a reduction loop evaluated in serial fashion.
When it comes to special functions like log, I’ll normally explicitly reference Base to avoid using the “fast” versions:

julia> @macroexpand @fastmath log(x)
:(Base.FastMath.log_fast(x))

julia> @macroexpand @fastmath Base.log(x)
:(Base.log(x))

When it comes to simple arithmetic:

julia> @macroexpand @fastmath a + b * c
:(Base.FastMath.add_fast(a, Base.FastMath.mul_fast(b, c)))

The “fast” versions give the compiler to reorder computations. That is, to unroll and vectorize. The unrolling (and using separate accumulators) should also make it slightly more accurate on average.

I’ve seen references to __init__ in other Julia libraries, but in my experience it’s strictly a Python thing. Where does this come up in Julia?

If you need to run code when loading a module. I use it to push into data structures defined in other libraries, Mmaping blocks of memory, and initializing random number generators.

2 Likes

Thanks for coding up the atomic version. But I think what you are seeing is the difference between a straight loop and reduce. Consider this version of threaded_mapreduce:

@@ -1,4 +1,4 @@
-function threaded_mapreduce(f, op, x)
+function threaded_mapreduce2(f, op, x)
     @assert length(x) % nthreads() == 0
     results = zeros(eltype(x), nthreads())
     @threads for tid in 1:nthreads()
@@ -8,7 +8,9 @@
         domain = start:stop

         acc = zero(eltype(x))
-        acc = op(acc, mapreduce(f, op, view(x, domain)))
+        for j in domain
+            @inbounds acc = op(acc, f(x[j]))
+        end
         results[tid] = acc
     end
     foldl(op, results)

When I ran

x = randn(10000000);
f = log∘sqrt∘abs;
b1 = @benchmark threaded_mapreduce(f, +, x)
b2 = @benchmark atomic_mapreduce(f, +, x)
b3 = @benchmark threaded_mapreduce2(f, +, x)

I get no definite improvement of atomic_mapreduce over threaded_mapreduce2

julia> judge(minimum(b2), minimum(b3))
BenchmarkTools.TrialJudgement:
  time:   -0.10% => invariant (5.00% tolerance)
  memory: +0.50% => invariant (1.00% tolerance)

while I see (some?) improvement of atomic_mapreduce over mapreduce-based threaded_mapreduce:

julia> judge(minimum(b2), minimum(b1))
BenchmarkTools.TrialJudgement:
  time:   -4.82% => invariant (5.00% tolerance)
  memory: -6.02% => improvement (1.00% tolerance)

as you observed (just showing the latter that I can reproduce your result in my laptop). Although BenchmarkTools.judge says it’s still just below the 5% threshold, I guess you can see the difference in more controlled environment and/or more samples, as you can measure clearer difference in single thread examples (e.g., see @Elrod’s comment).

Full results
julia> b1
BenchmarkTools.Trial:
  memory estimate:  3.38 KiB
  allocs estimate:  37
  --------------
  minimum time:     36.186 ms (0.00% GC)
  median time:      40.172 ms (0.00% GC)
  mean time:        40.063 ms (0.00% GC)
  maximum time:     45.547 ms (0.00% GC)
  --------------
  samples:          125
  evals/sample:     1

julia> b2
BenchmarkTools.Trial:
  memory estimate:  3.17 KiB
  allocs estimate:  33
  --------------
  minimum time:     34.443 ms (0.00% GC)
  median time:      40.789 ms (0.00% GC)
  mean time:        40.718 ms (0.00% GC)
  maximum time:     44.663 ms (0.00% GC)
  --------------
  samples:          123
  evals/sample:     1

julia> b3
BenchmarkTools.Trial:
  memory estimate:  3.16 KiB
  allocs estimate:  32
  --------------
  minimum time:     34.477 ms (0.00% GC)
  median time:      39.306 ms (0.00% GC)
  mean time:        39.248 ms (0.00% GC)
  maximum time:     43.249 ms (0.00% GC)
  --------------
  samples:          128
  evals/sample:     1

My understanding is that using lock and atomic is not the first thing you try for high-performance code, if there are other ways to do it.

I believe the approach I described is pretty common. For example, Random.jl and LinearAlgebra.jl use it. See:

It defines a piece of code that is executed when the module is first loaded. (It is very different from what Python call __init__). See: Modules · The Julia Language

I commented on the performance of summissing in the PR. I didn’t want to start the discussion of @simd in that PR to avoid distraction as it’s probably controversial. This is also the reason why foldl in my package Transducer.jl is an opt-in feature behind a keyword argument.

What I was wondering was that how much accuracy loss is expected in a high-level interface like foldl. For example, sum is “allowed” to return a different result for xs :: Vector and (x for x in xs) :: Generator as the former uses reduce and the latter uses foldl. So, is it acceptable to return a result from foldl that is only approximately correct? Even if not, would it be a good addition to opt-in such feature (e.g., with a keyword argument)?

Thanks, that’s a good point. I guess I should default to length(input) / nthreads() since small basesize is only required for uneven workload like terminating reduction.

Hi @cscherrer , I was just wondering if you have any updates on this, or what you decided to go with in the end?

Thanks!