Overhead of `Threads.@threads`

Thanks, that was great. Fascinating stuff. A reason ThreadingUtilities and CheapThreads don’t have a real API or documentation is because I haven’t looked into real approaches for how I could actually provide any sort of flexibility well.
I knew what I wanted for libraries like Octavian and LoopVectorization (i.e., low overhead static scheduling), so I came up with a way to do that.

When watching the video, I was excited to start trying to implement work stealing in CheapThreads until he started talking about continuation stealing, frame pointers, and the cactus stack.
LLVM’s code generator intrinsics provide a lot of what’s needed, but I think (like Pablo Halpern said) that compiler support would be necessary, i.e. can’t do it at the package level. In particular, making cactus stacks work.

I could implement child stealing. However, the low overhead on forks and on stealing back your own work are two of the things that make work stealing seem so appealing (vs my experience with @spawn); i.e. they could let me fairly confidently actually use Julia’s parallelism without worrying that I’m instead crippling performance of the program.
Not stalling on joins is also a major bonus.

You’ve actually been working on Julia’s compiler and threading (e.g. with TAPIR) – what’re your thoughts on actually implementing continuation stealing in Julia?

Continuation stealing just seems better – if it’s feasible in Julia, I’d prioritize other work (like finally doing the triangular loops or AD for LoopVectorization, both of which are things I’ve been promising for about a year now).
Or if there’s a way or things we can implement to let a package do it, I’d prioritize that effort over child stealing as well – I’d very much prefer to have things done in a performance optimal way, especially if there purpose is to improve performance. SIMD.jl would be useless if doing anything with the library allocated memory.

But if not, it’d be fun to take a stab at stealing children.

Even an infinitely fast static scheduler (i.e., statically chunking a loop) would be better with fewer spawns because of cache locality, although (a) this won’t really matter if the same core executes each chunk because others are busy, and (b) a workstealing schedule doesn’t have that problem.

But great example.
Out of curiosity, how would be being analyzable by the compiler help in that situation? Wouldn’t the code author still have to provide the single threaded prefix sum as the alternative to execute if there’s only one core?
I wouldn’t expect the compiler to be able to optimize parallel prefix sum into a prefix sum on its own.

CheapThreads.jl is currently entirely focused on batches of operations. Note that it expects you to request 1 less thread than you wish to run on, i.e. if you want to statically schedule 6 chunks of work, request 5 threads (running the 6th on the thread that made the request):

julia> using CheapThreads

julia> t1, r1 = CheapThreads.request_threads(Threads.threadid(), 5); t1
Thread (5) Iterator: UInt32[1, 2, 3, 4, 5]

So if you want to run on 6 threads, request 5, and iterate over the iterable it returns.

julia> dump(t1)
CheapThreads.UnsignedIteratorEarlyStop{UInt32}
  u: UInt32 0x0000001f
  i: UInt32 0x00000005

launching work directly on those tasks using ThreadingUtilities.jl.
I haven’t switched Octavian to use CheapThreads yet, but the change will be natural: it’ll decide it wants to use nthread threads based on the size of the matrices, request nthread-1, and then tell them how many there are when it runs the tasks if they need to know (i.e., if the matrices are large enough to require blocking and syncing across the contracted axis).
So while it is starting nthread-1 “jobs”, it only required a single query with the central manager.

Other jobs can proceed to request more threads:

julia> t2, r2 = CheapThreads.request_threads(Threads.threadid(), 3); t2
Thread (3) Iterator: UInt32[6, 7, 8]

The r* are tokens we can use to free threads:

julia> CheapThreads.free_threads!(r1)

julia> t3, r3 = CheapThreads.request_threads(Threads.threadid(), 8); t3
Thread (8) Iterator: UInt32[1, 2, 3, 4, 5, 9, 10, 11]

It will only give us up to min(Sys.CPU_THREADS, Threads.nthreads())-1 threads.

julia> Threads.nthreads()
20

julia> CheapThreads.free_threads!(r2)

julia> t4, r4 = CheapThreads.request_threads(Threads.threadid(), 20); t4
Thread (11) Iterator: UInt32[6, 7, 8, 12, 13, 14, 15, 16, 17, 18, 19]

So to statically schedule a set of work items, query the central manager once. This is reasonably fast

julia> @benchmark CheapThreads.free_threads!(last(CheapThreads.request_threads(Threads.threadid(), 8)))
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     15.459 ns (0.00% GC)
  median time:      15.516 ns (0.00% GC)
  mean time:        15.654 ns (0.00% GC)
  maximum time:     28.082 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     998

and because it’s done for groups at a time, it’d hopefully not be a major source of contention.

Once allocated a set of threads via request_threads!, it’s then the program’s job to run workitems on them using ThreadingUtilities. Whilst doing so, the program knows exactly how many threads it received

julia> length(t1) % Int
5

and can pass this and any required information to the workers.
In Octavian, the workers just need to know their own id among the workers (so, for (rid,tid) in enumerate(t1) for relative and thread ids) and how many there are. This tells them the relative position of their chunk, as well as how many other workers there are for syncing purposes.
Syncing works at the moment by incrementing an atomic counter, and then spin-checking the counter until all the workers have incremented it. Meaning all they need to know is how many there are.

The author could provide whatever information they want/need to the workers.

Perhaps extra risk in communication bottleneck is that it may be better to start jobs in a forking manner?
E.g., instead of the mainthread looping over the 5 jobs (or 128 on recent AMD and ARM systems, and potentially much more in the future), have a way to also parallelize the spawning?

I could try making LoopVectorization do this half-way when threading two loops by making the outer start workers, which then runs on the inner.

On current overhead of starting jobs:

Code
using ThreadingUtilities, StaticArrays, Test

struct MulStaticArray{P} end
function (::MulStaticArray{P})(p::Ptr{UInt}) where {P}
    _, (ptry,ptrx) = ThreadingUtilities.load(p, P, 2*sizeof(UInt))
    unsafe_store!(ptry, unsafe_load(ptrx) * 2.7)
    nothing
end
@generated function mul_staticarray_ptr(::A, ::B) where {A,B}
    c = MulStaticArray{Tuple{A,B}}()
    :(@cfunction($c, Cvoid, (Ptr{UInt},)))
end

@inline function setup_mul_svector!(p, y::Base.RefValue{SVector{N,T}}, x::Base.RefValue{SVector{N,T}}) where {N,T}
    py = Base.unsafe_convert(Ptr{SVector{N,T}}, y)
    px = Base.unsafe_convert(Ptr{SVector{N,T}}, x)
    fptr = mul_staticarray_ptr(py, px)
    offset = ThreadingUtilities.store!(p, fptr, sizeof(UInt))
    ThreadingUtilities.store!(p, (py,px), offset)
end

@inline function launch_thread_mul_svector!(tid, y, x)
    p = ThreadingUtilities.taskpointer(tid)
    setup_mul_svector!(p, y, x)
    if ThreadingUtilities._atomic_cas_cmp!(p, ThreadingUtilities.SPIN, ThreadingUtilities.TASK)
        nothing
    else
        ThreadingUtilities._atomic_cas_cmp!(p, ThreadingUtilities.WAIT, ThreadingUtilities.LOCK)
        ThreadingUtilities.wake_thread!(tid % UInt)
    end
end

function mul_svector_threads(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
    ra = Ref(a)
    rb = Ref(b)
    rc = Ref(c)
    rx = Ref{SVector{N,T}}()
    ry = Ref{SVector{N,T}}()
    rz = Ref{SVector{N,T}}()
    GC.@preserve ra rb rc rx ry rz begin
        launch_thread_mul_svector!(1, rx, ra)
        launch_thread_mul_svector!(2, ry, rb)
        launch_thread_mul_svector!(3, rz, rc)
        w = muladd.(2.7, a, b)
        ThreadingUtilities.__wait(1)
        ThreadingUtilities.__wait(2)
        ThreadingUtilities.__wait(3)
    end
    rx[],ry[],rz[],w
end

a = @SVector rand(16);
b = @SVector rand(16);
c = @SVector rand(16);
w1,x1,y1,z1 = mul_svector_threads(a, b, c)
@test w1 == a*2.7
@test x1 == b*2.7
@test y1 == c*2.7
@test z1 ≈ muladd(2.7, a, b)

@benchmark mul_svector_threads($(Ref(a))[], $(Ref(b))[], $(Ref(c))[])

I get:

julia> @benchmark mul_svector_threads($(Ref(a))[],$(Ref(b))[],$(Ref(c))[])
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     435.063 ns (0.00% GC)
  median time:      470.329 ns (0.00% GC)
  mean time:        472.062 ns (0.00% GC)
  maximum time:     680.133 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     158

versus similar Threads.@spawn and Distributed.@spawnat programs:

Code
julia> function mul_svector_spawn(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
           fa = Threads.@spawn a * 2.7
           fb = Threads.@spawn b * 2.7
           fc = Threads.@spawn c * 2.7

           w = muladd.(2.7, a, b)
           fetch(fa),fetch(fb),fetch(fc), w
       end
mul_svector_spawn (generic function with 1 method)

julia> using Distributed

julia> addprocs();

julia> @everywhere using StaticArrays

julia> function mul_svector_distributed(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
           fa = @spawnat 1 a * 2.7
           fb = @spawnat 2 b * 2.7
           fc = @spawnat 3 c * 2.7

           w = muladd.(2.7, a, b)
           fetch(fa),fetch(fb),fetch(fc), w
       end
mul_svector_distributed (generic function with 1 method)

Benchmarks:

julia> @benchmark mul_svector_spawn($(Ref(a))[],$(Ref(b))[],$(Ref(c))[])
BenchmarkTools.Trial:
  memory estimate:  2.80 KiB
  allocs estimate:  22
  --------------
  minimum time:     6.966 μs (0.00% GC)
  median time:      139.406 μs (0.00% GC)
  mean time:        139.138 μs (0.00% GC)
  maximum time:     186.426 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark mul_svector_distributed($(Ref(a))[], $(Ref(b))[], $(Ref(c))[])
BenchmarkTools.Trial:
  memory estimate:  19.14 KiB
  allocs estimate:  418
  --------------
  minimum time:     166.320 μs (0.00% GC)
  median time:      176.419 μs (0.00% GC)
  mean time:        193.205 μs (1.54% GC)
  maximum time:     10.103 ms (72.60% GC)
  --------------
  samples:          10000
  evals/sample:     1

Instrumenting the threaded version:

julia> const COUNTER = zeros(UInt, 9);

julia> function mul_svector_instrumented(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
           t0 = time_ns()
           ra = Ref(a)
           rb = Ref(b)
           rc = Ref(c)
           rx = Ref{SVector{N,T}}()
           ry = Ref{SVector{N,T}}()
           rz = Ref{SVector{N,T}}()
           t1 = time_ns()
           GC.@preserve ra rb rc rx ry rz begin
               launch_thread_mul_svector!(1, rx, ra)
               t2 = time_ns()
               launch_thread_mul_svector!(2, ry, rb)
               t3 = time_ns()
               launch_thread_mul_svector!(3, rz, rc)
               t4 = time_ns()
               w = muladd.(2.7, a, b)
               t5 = time_ns()
               ThreadingUtilities.__wait(1)
               t6 = time_ns()
               ThreadingUtilities.__wait(2)
               t7 = time_ns()
               ThreadingUtilities.__wait(3)
               t8 = time_ns()
           end
           COUNTER[1] += 1
           COUNTER[2] += t1 - t0
           COUNTER[3] += t2 - t1
           COUNTER[4] += t3 - t2
           COUNTER[5] += t4 - t3
           COUNTER[6] += t5 - t4
           COUNTER[7] += t6 - t5
           COUNTER[8] += t7 - t6
           COUNTER[9] += t8 - t7
           rx[],ry[],rz[],w
       end
mul_svector_instrumented (generic function with 1 method)

julia> @benchmark mul_svector_instrumented($(Ref(a))[], $(Ref(b))[], $(Ref(c))[]) setup=(fill!($COUNTER,0);)
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     593.026 ns (0.00% GC)
  median time:      619.301 ns (0.00% GC)
  mean time:        623.299 ns (0.00% GC)
  maximum time:     866.733 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     191

julia> (COUNTER ./ first(COUNTER))'
1×9 adjoint(::Vector{Float64}) with eltype Float64:
 1.0  21.9738  72.0942  67.8377  140.22  20.2827  67.4555  63.1152  67.9738

Using CheapThreads.jl would tack on an extra 15ns or so for the request/free threads.
The individual job-starts and waits each take 70-140ns on this computer.
Also, 0 allocations because the GC and threads don’t like each other.

4 Likes