Looking for advice on threading

Are there any (fairly simple) examples of how to work with Threads.@threads?

Background;: so far I have tried splitting up my computations in threaded chunks and eventually recombine the results from the chunks, similar in spirit to the “partial sums” below. It reduces my computation time from 10 minutes to 1.5 minutes on the office desktop, but it’s not so pretty. Is there a better strategy? (Don’t take the partial sums example too literally.)

function fnPartialSum(x,t1,t2)
  y = 0.0
  for i = t1:t2
    y = y + x[i]
  end
  return y
end

function fnTotalSum(x,nChunks)
  T    = length(x)
  m    = cld(T,nChunks)                   #no. elements in each chunk
  xSum = zeros(nChunks)                   #pre-allocate space for partial sums
  Threads.@threads for i = 1:nChunks      #do nChunks partial sums
    xSum[i] = fnPartialSum(x,1+(i-1)*m,min(i*m,T))
  end
  Sum = sum(xSum)
  return Sum
end

x = rand(999)
r1 = fnTotalSum(x,3)                      #use 3 chunks (threaded)
r2 = fnPartialSum(x,1,length(x))          #compare with no-thread
println("$r1 $r2")

Search for “map reduce” design pattern.

Anyway 6x on desktop seems to be very good for me.

4 Likes

In addition to @Jakub_Wronowski’s very good answer, you may want to know that Threads.@threads can directly be used to parallelize the summation loop (which spares you the pain of computing the correct partition of the indices range).

Something like this yields approximately the same performances as your threaded version, but is perhaps more readable:

function fnSum(x)
    # Array holding one partial sum per thread
    partialSum = zeros(eltype(x), Threads.nthreads())

    # Threads.@threads directly parallelizes the loop over elements in x
    Threads.@threads for e in x
        @inbounds partialSum[Threads.threadid()] += e
    end

    # Final reduction
    sum(partialSum)
end

Some benchmarking (on my machine with JULIA_NUM_THREADS=4, matching the number of cores, and with a large enough vector):

julia> using BenchmarkTools

julia> let
           x = rand(1_000_000);
           r1 = @btime fnTotalSum($x,Threads.nthreads()) # threaded
           r2 = @btime fnPartialSum($x,1,$(length(x)))   # not threaded
           r3 = @btime fnSum($x)                         # threaded
       
           println("$r1\n$r2\n$r3")
       end

# Benchmark
  405.389 μs (31 allocations: 3.16 KiB) # fnTotalSum (threaded)
  1.444 ms (0 allocations: 0 bytes)     # fnPartialSum (not threaded)
  409.454 μs (31 allocations: 3.13 KiB) # fnSum (threaded)

# Results
499823.13580023544 # fnTotalSum (threaded)
499823.13580021414 # fnPartialSum (not threaded)
499823.13580023544 # fnSum (threaded)



PS: I sprinkled a few @inbounds here and there to try and benchmark comparable implementations. Here is the complete script I used:

Script
function fnPartialSum(x,t1,t2)
  y = zero(eltype(x))
  @inbounds for i = t1:t2
    y = y + x[i]
  end
  return y
end

function fnTotalSum(x,nChunks)
  T    = length(x)
  m    = cld(T,nChunks)                   #no. elements in each chunk
  xSum = zeros(nChunks)                   #pre-allocate space for partial sums
  Threads.@threads for i = 1:nChunks      #do nChunks partial sums
    @inbounds xSum[i] = fnPartialSum(x,1+(i-1)*m,min(i*m,T))
  end
  Sum = sum(xSum)
  return Sum
end

function fnSum(x)
    partialSum = zeros(eltype(x), Threads.nthreads())

    Threads.@threads for e in x
        @inbounds partialSum[Threads.threadid()] += e
    end

    sum(partialSum)
end

using BenchmarkTools

let
    x = rand(1_000_000);
    r1 = @btime fnTotalSum($x,Threads.nthreads())     #use 3 chunks (threaded)
    r2 = @btime fnPartialSum($x,1,$(length(x)))       #compare with no-thread
    r3 = @btime fnSum($x)

    println("$r1\n$r2\n$r3")
end

1 Like

FYI, with Transducers.jl (see Thread- and process-based parallelisms in Transducers.jl (+ some news)), it’s reduce(+, Map(identity), x; basesize=length(x) ÷ nChunks).

I find this kind of approaches limiting as it’s impossible to write a parallel version of sum(f, xs) this way without relying on compiler internal (aka Core.Compiler.return_type).

I think this pattern would invoke false sharing and could be bad for performance.

4 Likes

Thanks. I’ve not worked with Transducers.jl, so just a quick question: can Transducers.jl handle more complicated functions (instead of summing, suppose it’s lots of linear algebra)?

They are generic, you can pass any function.
Remember, that to use map reduce pattern your function has to have reduction property f(a, f(b, c)) === f(f(a,b),c). I am not expert here, maybe there are workarounds for that I am no aware.

2 Likes

As Jakub_Wronowski said, reduce(op, xf, input) should work as long as the function op is associative and the transducer xf is stateless. Maybe easier form to use is reduce(op, eduction(iterator_comprehension)) where iterator_comprehension is, for example, something like (f(x) for x in input if p(x)). More explanations in the tutorial: https://tkf.github.io/Transducers.jl/dev/examples/tutorial_parallel/

2 Likes