ThreadsX mapreduce performance

Hi,

I wonder why ThreadsX.mapreduce is slower than a straightforward @threads loop for a simple function like ThreadsX.mapreduce(abs,max,x) computing the maximal absolute value of an array x.

julia> main()
Threads.nthreads() = 4
t_absmax_sequential = 0.000512871 ## loop based seq function
t_absmax_threaded = 0.000134623  ## @Threads.@threads version
t_absmax_mapreduce = 0.000275336  ## mapreduce(abs,max,x)
t_absmax_mapreduce_tx = 0.000299806 ## ThreadsX.mapreduce(abs,max,x)

I am a bit annoyed by this because I wanted to show (in a lecture) that higher order functions are the way to go for safer MT programming. It works like a charm for GPU (via CUDA.jl) though.

MWE
using ThreadsX
using BenchmarkTools
using Random

function absmax(a)
    result=zero(eltype(a))
    @inbounds for i in 1:length(a)
        abs(a[i])>result && (result=abs(a[i]))
    end
    result
end

function absmax_range(a,b,e)
    result=zero(eltype(a))
    @inbounds for i in b:e
        abs(a[i])>result && (result=abs(a[i]))
    end
    result
end

function splitrange(l,n)
    chuncksize=div(l+n-1,n)
    bes=Array{Tuple{Int,Int}}(undef,n)
    b=1
    for p=1:n
        e=min(b+chuncksize-1,l)
        bes[p]=(b,e)
        b=e+1
    end
    bes
end

function absmax_threaded(a)
    n=Threads.nthreads()
    presults=zeros(eltype(a),n)
    bes=splitrange(length(a),n)
    Threads.@threads for p in 1:n
        @inbounds presults[p]=absmax_range(a,bes[p]...)
    end
    maximum(presults)
end

absmax_mapreduce(x) = mapreduce(abs,max,x)
absmax_mapreduce_tx(x) = ThreadsX.mapreduce(abs,max,x)

function main()
    @show Threads.nthreads()
    n=2^19
    Random.seed!(1234)
    x = rand(n) .- 0.5
    @assert absmax(x)==absmax_threaded(x)
    @assert absmax(x)==absmax_mapreduce(x)
    @assert absmax(x)==absmax_mapreduce_tx(x)

    t_absmax_sequential=@belapsed absmax($x)
    t_absmax_threaded=@belapsed absmax_threaded($x)
    t_absmax_mapreduce=@belapsed absmax_mapreduce($x)
    t_absmax_mapreduce_tx=@belapsed absmax_mapreduce_tx($x)
    @show t_absmax_sequential
    @show t_absmax_threaded
    @show t_absmax_mapreduce
    @show t_absmax_mapreduce_tx
end
1 Like

The modified MWE

function main()
    @show Threads.nthreads()
    n=2^19
    Random.seed!(1234)
    x = rand(n) .- 0.5
    @assert absmax(x)==absmax_threaded(x)
    @assert absmax(x)==absmax_mapreduce(x)
    @assert absmax(x)==absmax_mapreduce_tx(x)

    @btime absmax($x)
    @btime absmax_threaded($x)
    @btime absmax_mapreduce($x)
    @btime absmax_mapreduce_tx($x)
end

shows on my system (Julia 1.6.3)

447.200 μs (0 allocations: 0 bytes)
  90.600 μs (33 allocations: 3.44 KiB)
  236.800 μs (0 allocations: 0 bytes)
  218.900 μs (506 allocations: 37.45 KiB)

which looks OK to me. Increasing the problem size to 2^30 (careful: 8 GB) I observe

Threads.nthreads() = 6
  1.152 s (0 allocations: 0 bytes)
  299.382 ms (33 allocations: 3.44 KiB)
  749.125 ms (0 allocations: 0 bytes)
  452.920 ms (510 allocations: 37.58 KiB)

which seems to indicate overhead for small problem sizes. BTW: I’m not sure if SIMD should kick in here.

Your results look consistent with mine : I am still wondering why my basic threaded version is more than twice as fast as the ThreadsX.mapreduce function. @tkf may have an hint on this.

I did not post the simd version but yes : simd composes very well with threads in this case and brings another 3x Spup.

2 Likes

For the bigger problem size the factor is already around 1.5. But of course I don’t know if ThreadX will converge to basic threading performance depending on problem size.

1 Like

I should add a curve with ThreadsX.mapreduce. When combined with SIMD, the difference for large arrays will probably become smaller because the problems becomes memory bound.

You can also try LoopVectorization 0.12.87 (with older versions, you’ll need to modify the loop to say result = max(result, abs(a[i])) instead, at least for the @tturbo version; @turbo has been working with the && version for a while).

Code
using ThreadsX, LoopVectorization, BenchmarkTools, Random
function absmax_turbo(a) # LV serial
  result=zero(eltype(a))
  @turbo for i in 1:length(a)
    abs(a[i])>result && (result=abs(a[i]))
  end
  result
end
function absmax_tturbo(a) # LV threaded
  result=zero(eltype(a))
  @tturbo for i in 1:length(a)
    abs(a[i])>result && (result=abs(a[i]))
  end
  result
end
function absmax(a)
    result=zero(eltype(a))
    @inbounds for i in 1:length(a)
        abs(a[i])>result && (result=abs(a[i]))
    end
    result
end
function absmax_range(a,b,e)
    result=zero(eltype(a))
    @inbounds for i in b:e
        abs(a[i])>result && (result=abs(a[i]))
    end
    result
end
function splitrange(l,n)
    chuncksize=div(l+n-1,n)
    bes=Array{Tuple{Int,Int}}(undef,n)
    b=1
    for p=1:n
        e=min(b+chuncksize-1,l)
        bes[p]=(b,e)
        b=e+1
    end
    bes
end
function absmax_threaded(a)
    n=Threads.nthreads()
    presults=zeros(eltype(a),n)
    bes=splitrange(length(a),n)
    Threads.@threads for p in 1:n
        @inbounds presults[p]=absmax_range(a,bes[p]...)
    end
    maximum(presults)
end
absmax_mapreduce(x) = mapreduce(abs,max,x)
absmax_mapreduce_tx(x) = ThreadsX.mapreduce(abs,max,x)
function main()
  @show Threads.nthreads()
  n=2^19
  Random.seed!(1234)
  x = rand(n) .- 0.5
  absmaxx = absmax(x)
  @assert absmaxx==absmax_threaded(x)
  @assert absmaxx==absmax_mapreduce(x)
  @assert absmaxx==absmax_mapreduce_tx(x)
  @assert absmaxx==absmax_turbo(x)
  @assert absmaxx==absmax_tturbo(x)

  @btime absmax($x)
  @btime absmax_threaded($x)
  @btime absmax_mapreduce($x)
  @btime absmax_mapreduce_tx($x)
  @btime absmax_turbo($x)
  @btime absmax_tturbo($x)
end

Results:

julia> main()
Threads.nthreads() = 36
  475.570 μs (0 allocations: 0 bytes) # absmax
  83.320 μs (183 allocations: 18.23 KiB) # absmax_threaded
  249.834 μs (0 allocations: 0 bytes) # absmax_mapreduce
  231.736 μs (4406 allocations: 297.36 KiB) # absmax_mapreduce_tx
  148.614 μs (0 allocations: 0 bytes) # absmax_turbo
  4.116 μs (0 allocations: 0 bytes) # absmax_tturbo
0.4999992351615653
2 Likes

There seem to be a couple of independent factors contributing to the performance difference.

Factor 1: < vs isless

First of all, abs(a[i])>result && (result=abs(a[i])) used in absmax of the OP is faster than max. This is because < does not define an ordering with NaN and max/isless do extra work to do the right thing:

julia> isless(NaN, 0.0)
false

julia> isless(0.0, NaN)
true

julia> NaN < 0.0
false

julia> 0.0 < NaN
false

A fair comparison would be to use

fmax(a, b) = ifelse(b > a, b, a)

both in absmax and the mapreduce examples. So, absmax would look like

function absmax(a)
    result = typemin(eltype(a))
    @inbounds for i in 1:length(a)
        result = fmax(result, abs(a[i]))
    end
    result
end

(Nitpick: I fixed the initial value of result to typemin(eltype(a)). The original definition using zero is not correct unless a is known to contain non-negative numbers. Alternatively, result = -0.5 is a good initial value since it is the known lower bound of a.)

To use fmax with ThreadsX.jl, I added the following implementations:

absmax_mapreduce_tx(x) = ThreadsX.mapreduce(abs,fmax,x; init = typemin(eltype(x)))

Unfortunately, ThreadsX.mapreduce(abs,fmax,x) cannot be used ATM since the initial value of fmax is not known. Transducers.jl already have the facility to support this but I have to think about the impact of this.

With these changes, running the script (see below) with julia -t4 script.jl prints

Threads.nthreads() = 4
n = 524288
4-element Vector{Pair{Symbol, BenchmarkTools.Trial}}:
   :sequential => 313.067 μs
     :threaded => 46.089 μs
    :mapreduce => 163.239 μs
 :mapreduce_tx => 85.089 μs

Better than 3x, but still ThreadsX.jl does not perform well.

Factor 2: task scheduler

A bit more try-and-error revealed that the main difference may be due to how @threads and @spawn (used in ThreadsX.jl) interact with Julia’s and OS’s task scheduler. In general, @spawn is more dynamic and has more freedom in the scheduling. This is great for composing “nested” parallel programs. But for microbenchmarks of “shallow” parallel programs like tested here, the overhead can be manifested as the result above. We can at least constrain the dynamism on the OS side by using JULIA_EXCLUSIVE=1 julia -t4 script.jl to start the script:

Threads.nthreads() = 4
n = 524288
4-element Vector{Pair{Symbol, BenchmarkTools.Trial}}:
   :sequential => 156.608 μs
     :threaded => 41.730 μs
    :mapreduce => 163.019 μs
 :mapreduce_tx => 51.740 μs

mapreduce_tx is now pretty close to threaded.

(Note: Above explanation is a bit hand-wavy since I mixed the dynamism in Julia’s and OS’s task schedulers.)

Factor 3: constant overhead in Julia task

Currently, the scheduler requires a few μs to ten μs overheads for each task. So computation that takes about 50 μs will contain a non-negligible effect from the scheduler. So, bumping the problem size to n = 2^22 and running JULIA_EXCLUSIVE=1 julia -t4 script.jl shows me

Threads.nthreads() = 4
n = 4194304
4-element Vector{Pair{Symbol, BenchmarkTools.Trial}}:
   :sequential => 2.533 ms
     :threaded => 885.732 μs
    :mapreduce => 2.544 ms
 :mapreduce_tx => 821.713 μs

tl;dr

So, it looks like the performance difference was coming from the algorithmic difference of the base case (Factor 1) and the task system (Factor 2 & 3).

Here’s the script I used:

script
using ThreadsX
using BenchmarkTools
using Random

fmax(a, b) = ifelse(b > a, b, a)

function absmax(a)
    result = typemin(eltype(a))
    @inbounds for i in 1:length(a)
        result = fmax(result, abs(a[i]))
    end
    result
end

function absmax_range(a,b,e)
    result = typemin(eltype(a))
    @inbounds for i in b:e
        result = fmax(result, abs(a[i]))
    end
    result
end

function splitrange(l,n)
    chuncksize=div(l+n-1,n)
    bes=Array{Tuple{Int,Int}}(undef,n)
    b=1
    for p=1:n
        e=min(b+chuncksize-1,l)
        bes[p]=(b,e)
        b=e+1
    end
    bes
end

function absmax_threaded(a)
    n=Threads.nthreads()
    presults=zeros(eltype(a),n)
    bes=splitrange(length(a),n)
    Threads.@threads for p in 1:n
        @inbounds presults[p]=absmax_range(a,bes[p]...)
    end
    maximum(presults)
end

absmax_mapreduce(x) = mapreduce(abs,fmax,x)
absmax_mapreduce_tx(x) = ThreadsX.mapreduce(abs,fmax,x; init = typemin(eltype(x)))
absmax_mapreduce_tx_simd(x) =
    ThreadsX.mapreduce(abs,fmax,x; init = typemin(eltype(x)), simd = Val(true))

function main()
    @show Threads.nthreads()
    n=2^19
    # n=2^22
    @show n
    Random.seed!(1234)
    x = rand(n) .- 0.5
    @assert absmax(x)==absmax_threaded(x)
    @assert absmax(x)==absmax_mapreduce(x)
    @assert absmax(x)==absmax_mapreduce_tx(x)
    # @assert absmax(x)==absmax_mapreduce_tx_simd(x)

    results = [
        :sequential => @benchmark absmax($x)
        :threaded => @benchmark absmax_threaded($x)
        :mapreduce => @benchmark absmax_mapreduce($x)
        :mapreduce_tx => @benchmark absmax_mapreduce_tx($x)
        # :mapreduce_tx_simd => @benchmark absmax_mapreduce_tx_simd($x)
    ]
    display(results)
    return results
end

results = main()

Hopefully, we can minimize the latter factors by improving Julia. For example, by the “light-weight” more structured task system we are proposing here: RFC: Experimental API for may-happen in parallel parallelism by tkf · Pull Request #39773 · JuliaLang/julia · GitHub.

5 Likes

Thanks Chris. Actually we do show the turbo version in the Julia lecture :smiley:

But is the tturbo version avoids data races ?

Ideally I would like the Tx map reduce version to perform better because I think that this is the best expression.

1 Like

Thank you very much for this very informative explanation ! I was wondering if there was a kind of trade off between perf and versatility in the TX design. A bit late in France now : I will study your post more carefully tomorrow.

1 Like

Yes. It uses separate accumulators per thread.
The single threaded version already needs this capability to break up dependency chains.

There is also LoopVectorization.vmapreduce.
I (or someone else :wink: ) could add a threaded version.

Here’s the quick and dirty version of a tturbomapreduce

julia> using LoopVectorization, ThreadsX

julia> @generated function tturbomapreduce(::F, ::Op, v::Vector) where {F, Op}
           f  = F.instance
           op = Op.instance
           quote
               @assert length(v) > 1
               out = @inbounds $op($f(v[begin]), $f(v[begin+1]))
               @tturbo for i in (firstindex(v)+1):lastindex(v)
                   out = $op(out, $f(v[i]))
               end
               out
           end
       end;

julia> let n=2^19, v = rand(n) .- 0.5
           @btime mapreduce(abs, max, $v)
           @btime vmapreduce(abs, max, $v)
           @btime ThreadsX.mapreduce(abs, max, $v)
           @btime tturbomapreduce(abs, max, $v)
       end;
  134.369 μs (0 allocations: 0 bytes)
  61.930 μs (0 allocations: 0 bytes)
  155.609 μs (540 allocations: 36.55 KiB)
  22.080 μs (0 allocations: 0 bytes)

It won’t work with closures or other functions for which typeof(f).instance doesn’t work, but LoopVectorization already fails on those so I think it’s alright.

1 Like

Does it? That’s not intentional.

The naive non-@generated version had a problem since LV already wants to know what the functions are at macro-expansion time.
So another fix to support the non-@generated version is to lift that limitation.

But it will have to know what the op is regardless to know how to correctly re-associate. Is that the problem you were hitting?

julia> function map_tturbo!(f::F, y, x) where {F}
           @tturbo for i in eachindex(y, x)
               y[i] = f(x[i])
           end
       end
map_tturbo! (generic function with 1 method)

julia> x = rand(10_000); y = similar(x);

julia> map_tturbo!(x -> log1p(x)/3, y, x);

julia> y ≈ log1p.(x) ./ 3
true

It not knowing what the function does means it won’t optimize as well. Should also perhaps make sure the function inlines.

In that case, the .instance should only be required for op. f should be free to be relatively arbitrary.

Oh, my mistake, the thing I quickly tested on just had an error in it.