Overhead of `Threads.@threads`

LoopVectorization 0.10 uses these threadpools for vmapt:

julia> using LoopVectorization

julia> src = randn(10_000);

julia> dest = similar(src);

julia> function foo_threaded2!(dest, src)
                  p = collect(Iterators.partition(eachindex(src), length(src) ÷ Threads.nthreads()))
                  Threads.@threads for tid in 1:Threads.nthreads()
                      for i in p[tid]
                          @inbounds dest[i] = sin(cos(src[i]))
                      end
                  end
                  return nothing
              end
foo_threaded2! (generic function with 1 method)

julia> @benchmark foo_threaded2!($dest, $src)
BenchmarkTools.Trial:
  memory estimate:  16.78 KiB
  allocs estimate:  182
  --------------
  minimum time:     104.429 μs (0.00% GC)
  median time:      129.044 μs (0.00% GC)
  mean time:        133.739 μs (0.92% GC)
  maximum time:     5.737 ms (95.97% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark vmap!(sin ∘ cos, $dest, $src) # single threaded
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     16.836 μs (0.00% GC)
  median time:      17.051 μs (0.00% GC)
  mean time:        17.050 μs (0.00% GC)
  maximum time:     48.747 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark vmapt!(sin ∘ cos, $dest, $src) # multi threaded
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     2.235 μs (0.00% GC)
  median time:      2.370 μs (0.00% GC)
  mean time:        2.400 μs (0.00% GC)
  maximum time:     14.836 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     9

julia> Threads.nthreads()
36

EDIT:
A couple caveats:

  1. Of course, the code has to be vmap!-able for this to work. Maybe I should make a non-v version that just uses the threads.
  2. Throwing errors isn’t supported in these threads. Try not to do that.
4 Likes

Our actual code is not really vmap!-able or easily expressed in this form, it’s often more like

for element in some_range
  # do stuff accessing `u[some_set_of_some indices..., element]` and
  # `some_cache[some_indices_again..., element]` 
  # store some result in `du[some_other_indices..., element]`
end

Do you refer to GitHub - JuliaSIMD/ThreadingUtilities.jl: Utilities for low overhead threading in Julia.?

See also here for a discussion on thread latency and some benchmarks from last year:

https://github.com/JuliaLang/julia/issues/32701#issuecomment-640942249

I’m curious about nested schedulers in general. So, I wonder how spin-based task pool management interacts with Julia’s cooperative multitasking. Did you look at scenarios like

@sync for ...
    @spawn vmap!(...)
end

?

Since Julia’s parallel runtime randomizes scheduling, I think an interesting statistics for this is the median (“typical case”) than the minimum (“lucky path”).

1 Like

The API needs work, currently vmap! (no t) is single threaded, and vmapt! (t for tthreads) is threaded. There’s an issue suggesting a better API.

I looked at them in the sense that I added:

    if !((nt > 1) && iszero(ccall(:jl_in_threaded_region, Cint, ())))
        vmap_singlethread!(f, ptry, Zero(), N, Val{NonTemporal}(), ptrargs)
        return
    end

i.e., if it’s nested in other threaded code, it will run single threaded. Octavian.jl also checks to avoid nested threading.

Functions like maps and aren’t likely to utilize multiple cores as well as more granular threading.
In my earlier example, I only got an 8x speedup from a system with 18 cores/36 threads with vmap!.
I do think I can optimize the overhead of the dispatches more, so it does still get better with larger arrays:

julia> src = randn(40_000); dest = similar(src);

julia> @benchmark vmapt!(sin ∘ cos, $dest, $src) # multi threaded
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     5.075 μs (0.00% GC)
  median time:      5.237 μs (0.00% GC)
  mean time:        5.290 μs (0.00% GC)
  maximum time:     35.734 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     6

julia> @benchmark vmap!(sin ∘ cos, $dest, $src) # single threaded
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     67.932 μs (0.00% GC)
  median time:      68.133 μs (0.00% GC)
  mean time:        68.188 μs (0.00% GC)
  maximum time:     131.029 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

But I think simd-map!s in general are very overhead sensitive; by the time the problem is big enough to overcome scheduling overhead, you’re running into memory bandwidth problems.
FWIW, on this computer:

julia> @benchmark wait(Threads.@spawn nothing)
BenchmarkTools.Trial:
  memory estimate:  448 bytes
  allocs estimate:  5
  --------------
  minimum time:     61.246 μs (0.00% GC)
  median time:      72.259 μs (0.00% GC)
  mean time:        71.841 μs (0.00% GC)
  maximum time:     101.282 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

The @spawn overhead can be very large on some computers. On a different computer of mine it’s more like 7 microseconds median, which still means a single @spawn is slower than sin and cos on 40_000 Float64.

My point here is that there’s an advantage to simple (non-existant?) schedulers, opening many operations as profitable to thread that were not previously.

Consistency is another advantage of static scheduling. If there is only path, they’re all lucky :wink: .

2 Likes

I don’t think @spawn sets jl_in_threaded_region. Also, consider

t = @spawn f()
g()
wait(t)

Arguably, g is “threaded” in some sense. However, it runs on the “root” task in which this code is executed. It’s indistinguishable from any other “single-thread” code. So, I don’t think there is a valid way to detect “threaded code.”

Also, using single threaded code for the inner computation is the opposite of depth-first scheduling which is how Julia’s parallel runtime was designed; i.e., it’s better for the innermost computation to take available cores since they can share the cache.

This depends on what function is passed to the map. Each invocation can take an arbitrarily long time (and can use very small cache/memory). I feel it’d be limiting to assume that maps do not utilize multiple cores.

I think a better approach is to set the size of the base case so that you don’t @spawn anything when it’d slow things down. This yields much more composable parallel code since the code you don’t know that can run in parallel can take usable cores because Julia’s runtime can choose to schedule available tasks. Since, IIUC, LoopVectorization.jl has a cost model, can’t it infer the base size? I think it’d be really awesome if LoopVectorization.jl can auto-magically determine the appropriate base size.

(Or does it do so already? I didn’t check.)

1 Like

FWIW, Tullio has an extremely crude “cost model” which tries to do exactly this. (It counts 1 for any combination of *, +, abs etc, and 10 per log or other unrecognised function.) Or you can provide a size at which you claim it is worth dividing the work among two threads.

This uses @spawn, and when applied to fairly simple operations (including matrix multiplication) there is a regime where the overhead of this is significant.

If @spawn is an overhead, does it mean the base size takes around a few hundreds/tens of microseconds? I guess it’s imaginable to parallelize even such problem size. But, my point is that Tullio could be used in the context where parallelizing at an outer level is much more beneficial than squeezing out the performance at this level. Of course, I wouldn’t be making the noise if such optimizations can be done without a significant overhead (compared to parallelization at the outer level) and without loss of composability. But, if it is conceivable that manual management of task pool could be harmful when embedded in a large application, I think it’d be better to avoid making it the default behavior. I see no problem in providing scheduling options (e.g., managed pools) if it’s up to the end-user to specify this. In fact, I’m playing with a similar idea, too.

1 Like

Even lower, MKL benefits from multiple threads on 50x50 matrix multiplication, which takes about 5μs. For Tullio this is below the (default) threshold to turn on any threading.

On larger problems, it just calls @spawn at present, so ought to be composable as usual. I guess there’s still a regime where, if it’s only just beneficial to spawn threads when working alone, it would be significantly better not to when working within some outer-level parallel loop. I don’t know if anything can be done about that (automatically) apart from being fairly conservative about deciding when to turn on threading. On very large problems of course these costs wash out entirely.

I haven’t properly investigated the Octavian / ThreadingUtilities story. As you say perhaps it ought to be opt-in.

2 Likes

That is problematic (EDIT: I just checked. Yes, only @threads sets jl_enter_threaded_region.)
I’ll have to test, but I do not expect nesting these threads to work well at all.
What will probably happen if one of the threads it wants to spawn in is busy is it’d either wait or deadlock, depending on if that thread itself is I’ve of them it wants to run on.
I’ll have to add some strongly weirded warnings.

The problem is that these pools run on regular Julia threads. If one is busy with a different task, that will cause problems – potentially deadlocks.
If I could run them on isolated, protected, threads, I would.

I’ll look more at the threading code that exists to see about how to make it work better when possibly nested. E.g., is there a way to ask what another thread is doing? E.g., I don’t want to tell the ThreadingUtilities-managed task on thread 3 to do work if thread 3 is currently busy with a different task.

FWIW, @spawning from within these threads should be fine, but running them within threaded code would need figuring out.

I don’t like the design*implementation combination, which is why I’m working on an alternative.
Spawning a single do-nothing task takes 70 microseconds on that computer. (results for 3 other computers: 40, 7, and 3.5 microseconds)

On recent x64 CPUs, the L3 cache is shared*, while the L1 and L2 cache are local.
Sometimes, e.g. on multi-socket systems or many Ryzen CPUs, the L3 is only shared between groups of cores, meaning we have clusters of cores that do not share cache at all. There should be no depth-first scheduling transferring work between them.
Is Julia’s scheduler NUMA-aware?

Also, on many CPUs, e.g. Skylake-X, the L3 cache is non-inclusive of the L1 or L2 cache, meaning it doesn’t readily contain copies of the same data to send to other cores. Importantly, it is also not that large: 1.375 MiB per core, vs the 1 MiB of private L2 cache per core.
They have a lot of cores, so the overall shared L3 size is still large, but being non-inclusive adds to the latency of accessing data from another core. I guess it doesn’t if you need to synchronize anyway (i.e., if the data was mutated).

These Skylake-X chips can fit 135_168 Float64 in their private L1 and L2 caches.
My Tiger Lake laptop’s L1 and L2 caches are even larger (Hwloc.jl provides the actual information):

julia> (VectorizationBase.cache_size(Val(1)) + VectorizationBase.cache_size(Val(2))) ÷ 8
169984

The local L1 and L2 caches can hold almost 170_000 Float64. To make their way to another CPU, they would have to migrate through the L3 cache.
These are fairly large problem sizes cores can work on on their own, without needing to dip into L3.
This means that a granular, breadth-first, threading approach will often maximize cache friendliness:

Let the inner, potentially-threaded, code ask: are other cores busy?
If they aren’t, get some speedup by using them.
If they are, minimize overhead and maximize cache locality by running single threaded.

I’ve generally found that such granular/breadth-first threading works better.
If you’re fitting a model with Markov Chain Monte Carlo (MCMC), focus on fitting chains in parallel first and foremost. Although, if the overhead were low-enough, this would be the ideal case for composable threading. I’d err on the side of preferring breadth-first over depth-first:. people often only want 4 or so chains, so divide up the available threads among these cores to give each an allotment of 4 or so that they can run on.

If running a Monte Carlo simulation, where you’re generating datasets and fitting them with MCMC, then I prefer to have the MCMC entirely single threaded and parallelize at the more granular level of datasets. This gets great locality – each dataset gets to sit in a cores private cache for as long as it is needed.

Re sharing L3 cache, Octavian.jl will use the shared L3 cache (or L2 on ARM with only 2 cache levels, in which the L2 is shared and L1 private – but this needs testingl), but this involves a lot of synchronization, meaning synchronization overhead has to be minimal.

Yes, I should have said LoopVectorization.vmap instead of map.
Referencing the earlier example, ThrradsX.map is a convenient way to run MCMC chains in parallel, or do the Monte Carlo simulations of MCMC fits, and it’d be dangerous to assume that none of those thread, and of course they can potentially take a long time.

LoopVectorization.vmap is dumb, and doesn’t actually use any of the LoopVectorization infrastructure.
I should probably make a breaking release where I move it elsewhere.
It heuristically assumes a base case of 32W, where W is the SIMD vector width.

But yes, I do plan on adding support for these threads to LoopVetorization, where its cost modeling would decide the base case.
But I’m planning on doing that at the same time as a long-promised rewrite of much of the internals. I’ve had a few starts on that, but more demanding things keep coming up. But I hope to return fairly soon.
Or maybe set aside a 4-hour or so block of time each day to work on it and disallow outside distractions.

Yes, Octavian does even better than MKL for these sizes.
But, that does make me wonder about API/implementing breadth-first through a Julia ecosystem.
These, and the vmapt! example from earlier, are the sorts of things it’d be nice to parallelize on your local allotment of 4 cores while running 4 chains of MCMC on a 16+ core system.

For now, I’ll implement a AvailableTasks type that indicates type that can be explicitly passed around to indicate which threads are actually available for use to spawn on.

Sample benchmarks at 48x48:

julia> using BLASBenchmarksCPU, Octavian
[ Info: Precompiling BLASBenchmarksCPU [5fdc822c-4560-4d20-af7e-e5ee461714d5]

julia> M = K = N = 48;

julia> A = rand(M,K); B = rand(K,N); C2 = @time(A*B); C1 = similar(C2);
  0.000059 seconds (2 allocations: 18.078 KiB)

julia> @benchmark matmul!($C1,$A,$B) # Octavian.jl, Julia implementation
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     1.226 μs (0.00% GC)
  median time:      1.293 μs (0.00% GC)
  mean time:        1.300 μs (0.00% GC)
  maximum time:     6.446 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     10

julia> 2e-9M*K*N / 1.226e-6 # calculate GFLOPS
180.41109298531813
julia> @benchmark gemmmkl!($C1,$A,$B) # MKL
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     2.115 μs (0.00% GC)
  median time:      2.134 μs (0.00% GC)
  mean time:        2.139 μs (0.00% GC)
  maximum time:     8.383 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     9

julia> @benchmark gemmopenblas!($C1,$A,$B) # OpenBLAS
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     3.099 μs (0.00% GC)
  median time:      3.113 μs (0.00% GC)
  mean time:        3.120 μs (0.00% GC)
  maximum time:     8.860 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     8

julia> @benchmark gemmblis!($C1,$A,$B) # BLIS
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     39.319 μs (0.00% GC)
  median time:      44.409 μs (0.00% GC)
  mean time:        44.520 μs (0.00% GC)
  maximum time:     103.083 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark wait(Threads.@spawn nothing) # just spawning a thread and waiting on it
BenchmarkTools.Trial:
  memory estimate:  442 bytes
  allocs estimate:  4
  --------------
  minimum time:     1.829 μs (0.00% GC)
  median time:      7.932 μs (0.00% GC)
  mean time:        9.982 μs (0.00% GC)
  maximum time:     56.803 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     6

180 GFLOPS of Float64 is pretty good for such small sizes. On that computer (AVX512) MKL didn’t seem to start threading yet. It will with AVX2, but Octavian achieves better FLOPS at those sizes.

I’m curious about your ideas.

3 Likes

@mcabbott Would something like the following work?

julia> module Bar

       macro threaded(expr)
           esc(quote
               if Threads.nthreads() == 1
                   $(expr)
               else
                   Threads.@threads $(expr)
               end
           end)
       end

       end

Using nested macros seems to be complicated in Julia, cf. Macro hygiene is hard to use correctly in nested macro expansion · Issue #37691 · JuliaLang/julia · GitHub. I’m not an expert in writing macros, so every input is appreciated.

I think I’m with you for wanting a breadth-first approach. In particular, it’d be great to have work stealing (especially continuation stealing) scheduler. I think work stealing would be closer to the scheduling you are describing. The best part is that there is no need for API for asking “are other cores busy?”; the scheduler would handle it automatically.

I was mentioning the “design philosophy” only because it’s counter-productive to try too hard to bend the way threading works at package level in a way incompatible with how the core runtime works. (Also, I intentionally said “designed” and not “implemented”. IIUC, the depth-first story is not done yet.) Of course, experimenting things in packages is significantly faster/easier and that’s a huge benefit. Octavian looks to me that it is a great demonstration of what can be done in Julia.

Note that, even L1 and L2 are not shared with CPUs, it can potentially be shared with tasks if they are scheduled locally in time. Chunking tasks in more pieces than the number of CPUs might not make sense in settings like dense linear algebra. But this is beneficial for load-balancing in heterogeneous computation. A simple example may be sum(f(x) for x in xs if p(x)) where p is rare/irregular and xs has some non-trivial memory access pattern (e.g., a sliding window).

Yes, that’s exactly the problem I was worried about when writing the first comment. But can’t you just make the task scheduling in ThreadingUtilities non-blocking and failable? You can then serially execute the tasks that have failed to execute in the desired worker. It would make LoopVectorization/Octavian at least produce a correct result. If you want to make it more robust even in terms of the performance, I guess you can create a queue for each worker? Of course, this is started to sound like a full scheduler and maybe it’s where the benefit of a specialized scheduler becomes less appealing (if your aim is to improve latency; for improving throughput, I think some special schedulers make sense).

3 Likes

If I understand correctly, the normal paradigm would be to submit X jobs, and then different tasks would “steal” these from a primary queue if they don’t have any other work to do.
I would like the ability to ask “how many cores aren’t busy?” though, to divide fine grained work up into that many chunks.
If you’re processing dense arrays and aren’t using cache-level blocking, there aren’t discrete chunks of work.
Note that even for things like matrix multiplcation, you won’t always perform cache-level blocking; the matrices have to be fairly large before that’s profitable (i.e., cache-blocks provide an upper limit on the size of blocks; if the number-of-threads to array-size ratio is high enough, your preferred block sizes will be below this limit).

With roughly continuous chunks of work rather than discrete, the optimum is whatever minimizes overhead. If they’re going to run in parallel on 7 threads, dividing it up into 7 pieces yields the best throughput (minimum overhead), and definitely yields the best latency. E.g., if you have 70 time-units of work and there is 0 overhead to threading, diving into 7 threads takes 10 time-units, while dividing into 10 threads takes 14 time units.

I think we can do a lot in Julia, although Stefan argues that not fracturing the ecosystem is important.

True. Lots of workloads benefit from hyperthreading (but dense linear algebra also does not).

That’s a good idea.
Executing a task is non-blocking, but __wait (which seems to be a part of the API, so I should remove those leading underscores) is of course blocking.
So the simplest fix would be to have __wait check if a task has been started. If it hasn’t, it should “steal” the work (and execute serially).

I think it’d be great for the scheduler to support different kinds of behavior. For example, maybe f in sum(f(x) for x in xs if p(x)) does some dense linear algebra that it’d like to thread. If the matrices are moderately small, it may want to run them on e.g. 4 threads (on a computer with potentially 32, 64, or 128+ threads iterating over xs).
The thing I’d like there (for the smaller statically scheduled code) is the ability to request “up to X” threads as described earlier. These “up to X” should be scheduled together in a group; aside from lowering overhead of starting them, this would be beneficial in allowing for tasks that want low-overhead fine grained synchronization across the threads they do run (i.e., if a set of tasks synchronizes via spin locks, you want to start them at the same time).

I think we’d need a (runnable) motivating example to pick the best behaviors at each level, but that’d be an interesting example w/ respect to composability

2 Likes

Having single queue is a source of contention. A typical approach in high-performance concurrent programming is to reduce contention as much as possible. In the work stealing approach, this is done by having a worker local queue; if it’s empty, steal it from other worker’s queue.

This would require a central counter or something to track the available workers and so induces contention. It could work you make it lock-free and only approximately true, though.

But, I think the most compelling reason to not use this approach by default is that the computation can become non-deterministic. It makes debugging numerical programs very hard and makes reproducing computations impossible.

Kinda repeating myself, but my hunch is that this sort of API increases communications across the workers and behaves not so well in a large application where your optimized routine runs side by side with other routines. Maybe it can work if the grouping requirement is “soft” in some sense and using lock-free API as much as possible.

Wouldn’t spin lock slow down everything (and not just your routine) or even cause a deadlock, if you couldn’t get exactly X workers?

1 Like

I’ll have to think about and test this more. CheapThreads currently uses one central counter.

That sounds reasonable. Perhaps by default, using the “reserved thread” approach should only use the reserved threads (and not look to the larger pool for additional work).
This also decentralizes the queues, at the cost of requiring each thread to indicate their maximum allotment of threads up front.

One of the goals is that receiving just a single thread is common. I’d be more inclined/willing to make possible-threading the default behavior than making “definitely spawn X tasks”.

If code gets a 2x speedup by using 4 cores, is it worth threading?
Probably only if the cores aren’t doing anything else, so I would say “no” by default.
But perhaps forcing users to choose between threaded and non-threaded functions everywhere isn’t a bad thing. It may however be annoying when they add another level of nesting, e.g. running a monte carlo simulation of whatever analysis they were doing earlier.

Not if the workers are aware of how many workers there are.

1 Like

By the way, just for possible lurkers around here who are happened to get bored and looking for a nice YouTube video, I remember enjoying this CppCon talk by Pablo Halpern on work stealing (even though I didn’t know anything about it back then):

This is exactly the question I’ve been having back of my head for a long time! Actually, I think an even harder question than the scheduling is the choice of algorithm. For example, even a well known clever algorithm like parallel prefix sum does not provide good speedup if there’s not much workers (the speedup factor (T_1/T_N)/N approaches to 0.5 for 1 \ll N = \mathtt{nthreads()} \ll \mathtt{length(data)}). So, even if we have an infinitely fast scheduler, it’s not wise to use parallel prefix sum if we can parallelize outside (e.g., prefix sum is used inside a simple parallel map). Since this is a choice of algorithm, it’s not solvable by having a clever scheduling strategy. The only way I can think of for automating this is to query available workers, as you are suggesting, or making the parallel program analyzable by the compiler (to get a static task structure).

Doesn’t it need some communication between the workers? Maybe you can only guarantee that the reply from the scheduler is the lower bound of the number of scheduled works, to avoid making the communication the bottleneck. This is (and also many other comments of mine are) just my speculation, though.

4 Likes

Thanks, that was great. Fascinating stuff. A reason ThreadingUtilities and CheapThreads don’t have a real API or documentation is because I haven’t looked into real approaches for how I could actually provide any sort of flexibility well.
I knew what I wanted for libraries like Octavian and LoopVectorization (i.e., low overhead static scheduling), so I came up with a way to do that.

When watching the video, I was excited to start trying to implement work stealing in CheapThreads until he started talking about continuation stealing, frame pointers, and the cactus stack.
LLVM’s code generator intrinsics provide a lot of what’s needed, but I think (like Pablo Halpern said) that compiler support would be necessary, i.e. can’t do it at the package level. In particular, making cactus stacks work.

I could implement child stealing. However, the low overhead on forks and on stealing back your own work are two of the things that make work stealing seem so appealing (vs my experience with @spawn); i.e. they could let me fairly confidently actually use Julia’s parallelism without worrying that I’m instead crippling performance of the program.
Not stalling on joins is also a major bonus.

You’ve actually been working on Julia’s compiler and threading (e.g. with TAPIR) – what’re your thoughts on actually implementing continuation stealing in Julia?

Continuation stealing just seems better – if it’s feasible in Julia, I’d prioritize other work (like finally doing the triangular loops or AD for LoopVectorization, both of which are things I’ve been promising for about a year now).
Or if there’s a way or things we can implement to let a package do it, I’d prioritize that effort over child stealing as well – I’d very much prefer to have things done in a performance optimal way, especially if there purpose is to improve performance. SIMD.jl would be useless if doing anything with the library allocated memory.

But if not, it’d be fun to take a stab at stealing children.

Even an infinitely fast static scheduler (i.e., statically chunking a loop) would be better with fewer spawns because of cache locality, although (a) this won’t really matter if the same core executes each chunk because others are busy, and (b) a workstealing schedule doesn’t have that problem.

But great example.
Out of curiosity, how would be being analyzable by the compiler help in that situation? Wouldn’t the code author still have to provide the single threaded prefix sum as the alternative to execute if there’s only one core?
I wouldn’t expect the compiler to be able to optimize parallel prefix sum into a prefix sum on its own.

CheapThreads.jl is currently entirely focused on batches of operations. Note that it expects you to request 1 less thread than you wish to run on, i.e. if you want to statically schedule 6 chunks of work, request 5 threads (running the 6th on the thread that made the request):

julia> using CheapThreads

julia> t1, r1 = CheapThreads.request_threads(Threads.threadid(), 5); t1
Thread (5) Iterator: UInt32[1, 2, 3, 4, 5]

So if you want to run on 6 threads, request 5, and iterate over the iterable it returns.

julia> dump(t1)
CheapThreads.UnsignedIteratorEarlyStop{UInt32}
  u: UInt32 0x0000001f
  i: UInt32 0x00000005

launching work directly on those tasks using ThreadingUtilities.jl.
I haven’t switched Octavian to use CheapThreads yet, but the change will be natural: it’ll decide it wants to use nthread threads based on the size of the matrices, request nthread-1, and then tell them how many there are when it runs the tasks if they need to know (i.e., if the matrices are large enough to require blocking and syncing across the contracted axis).
So while it is starting nthread-1 “jobs”, it only required a single query with the central manager.

Other jobs can proceed to request more threads:

julia> t2, r2 = CheapThreads.request_threads(Threads.threadid(), 3); t2
Thread (3) Iterator: UInt32[6, 7, 8]

The r* are tokens we can use to free threads:

julia> CheapThreads.free_threads!(r1)

julia> t3, r3 = CheapThreads.request_threads(Threads.threadid(), 8); t3
Thread (8) Iterator: UInt32[1, 2, 3, 4, 5, 9, 10, 11]

It will only give us up to min(Sys.CPU_THREADS, Threads.nthreads())-1 threads.

julia> Threads.nthreads()
20

julia> CheapThreads.free_threads!(r2)

julia> t4, r4 = CheapThreads.request_threads(Threads.threadid(), 20); t4
Thread (11) Iterator: UInt32[6, 7, 8, 12, 13, 14, 15, 16, 17, 18, 19]

So to statically schedule a set of work items, query the central manager once. This is reasonably fast

julia> @benchmark CheapThreads.free_threads!(last(CheapThreads.request_threads(Threads.threadid(), 8)))
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     15.459 ns (0.00% GC)
  median time:      15.516 ns (0.00% GC)
  mean time:        15.654 ns (0.00% GC)
  maximum time:     28.082 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     998

and because it’s done for groups at a time, it’d hopefully not be a major source of contention.

Once allocated a set of threads via request_threads!, it’s then the program’s job to run workitems on them using ThreadingUtilities. Whilst doing so, the program knows exactly how many threads it received

julia> length(t1) % Int
5

and can pass this and any required information to the workers.
In Octavian, the workers just need to know their own id among the workers (so, for (rid,tid) in enumerate(t1) for relative and thread ids) and how many there are. This tells them the relative position of their chunk, as well as how many other workers there are for syncing purposes.
Syncing works at the moment by incrementing an atomic counter, and then spin-checking the counter until all the workers have incremented it. Meaning all they need to know is how many there are.

The author could provide whatever information they want/need to the workers.

Perhaps extra risk in communication bottleneck is that it may be better to start jobs in a forking manner?
E.g., instead of the mainthread looping over the 5 jobs (or 128 on recent AMD and ARM systems, and potentially much more in the future), have a way to also parallelize the spawning?

I could try making LoopVectorization do this half-way when threading two loops by making the outer start workers, which then runs on the inner.

On current overhead of starting jobs:

Code
using ThreadingUtilities, StaticArrays, Test

struct MulStaticArray{P} end
function (::MulStaticArray{P})(p::Ptr{UInt}) where {P}
    _, (ptry,ptrx) = ThreadingUtilities.load(p, P, 2*sizeof(UInt))
    unsafe_store!(ptry, unsafe_load(ptrx) * 2.7)
    nothing
end
@generated function mul_staticarray_ptr(::A, ::B) where {A,B}
    c = MulStaticArray{Tuple{A,B}}()
    :(@cfunction($c, Cvoid, (Ptr{UInt},)))
end

@inline function setup_mul_svector!(p, y::Base.RefValue{SVector{N,T}}, x::Base.RefValue{SVector{N,T}}) where {N,T}
    py = Base.unsafe_convert(Ptr{SVector{N,T}}, y)
    px = Base.unsafe_convert(Ptr{SVector{N,T}}, x)
    fptr = mul_staticarray_ptr(py, px)
    offset = ThreadingUtilities.store!(p, fptr, sizeof(UInt))
    ThreadingUtilities.store!(p, (py,px), offset)
end

@inline function launch_thread_mul_svector!(tid, y, x)
    p = ThreadingUtilities.taskpointer(tid)
    setup_mul_svector!(p, y, x)
    if ThreadingUtilities._atomic_cas_cmp!(p, ThreadingUtilities.SPIN, ThreadingUtilities.TASK)
        nothing
    else
        ThreadingUtilities._atomic_cas_cmp!(p, ThreadingUtilities.WAIT, ThreadingUtilities.LOCK)
        ThreadingUtilities.wake_thread!(tid % UInt)
    end
end

function mul_svector_threads(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
    ra = Ref(a)
    rb = Ref(b)
    rc = Ref(c)
    rx = Ref{SVector{N,T}}()
    ry = Ref{SVector{N,T}}()
    rz = Ref{SVector{N,T}}()
    GC.@preserve ra rb rc rx ry rz begin
        launch_thread_mul_svector!(1, rx, ra)
        launch_thread_mul_svector!(2, ry, rb)
        launch_thread_mul_svector!(3, rz, rc)
        w = muladd.(2.7, a, b)
        ThreadingUtilities.__wait(1)
        ThreadingUtilities.__wait(2)
        ThreadingUtilities.__wait(3)
    end
    rx[],ry[],rz[],w
end

a = @SVector rand(16);
b = @SVector rand(16);
c = @SVector rand(16);
w1,x1,y1,z1 = mul_svector_threads(a, b, c)
@test w1 == a*2.7
@test x1 == b*2.7
@test y1 == c*2.7
@test z1 ≈ muladd(2.7, a, b)

@benchmark mul_svector_threads($(Ref(a))[], $(Ref(b))[], $(Ref(c))[])

I get:

julia> @benchmark mul_svector_threads($(Ref(a))[],$(Ref(b))[],$(Ref(c))[])
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     435.063 ns (0.00% GC)
  median time:      470.329 ns (0.00% GC)
  mean time:        472.062 ns (0.00% GC)
  maximum time:     680.133 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     158

versus similar Threads.@spawn and Distributed.@spawnat programs:

Code
julia> function mul_svector_spawn(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
           fa = Threads.@spawn a * 2.7
           fb = Threads.@spawn b * 2.7
           fc = Threads.@spawn c * 2.7

           w = muladd.(2.7, a, b)
           fetch(fa),fetch(fb),fetch(fc), w
       end
mul_svector_spawn (generic function with 1 method)

julia> using Distributed

julia> addprocs();

julia> @everywhere using StaticArrays

julia> function mul_svector_distributed(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
           fa = @spawnat 1 a * 2.7
           fb = @spawnat 2 b * 2.7
           fc = @spawnat 3 c * 2.7

           w = muladd.(2.7, a, b)
           fetch(fa),fetch(fb),fetch(fc), w
       end
mul_svector_distributed (generic function with 1 method)

Benchmarks:

julia> @benchmark mul_svector_spawn($(Ref(a))[],$(Ref(b))[],$(Ref(c))[])
BenchmarkTools.Trial:
  memory estimate:  2.80 KiB
  allocs estimate:  22
  --------------
  minimum time:     6.966 μs (0.00% GC)
  median time:      139.406 μs (0.00% GC)
  mean time:        139.138 μs (0.00% GC)
  maximum time:     186.426 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark mul_svector_distributed($(Ref(a))[], $(Ref(b))[], $(Ref(c))[])
BenchmarkTools.Trial:
  memory estimate:  19.14 KiB
  allocs estimate:  418
  --------------
  minimum time:     166.320 μs (0.00% GC)
  median time:      176.419 μs (0.00% GC)
  mean time:        193.205 μs (1.54% GC)
  maximum time:     10.103 ms (72.60% GC)
  --------------
  samples:          10000
  evals/sample:     1

Instrumenting the threaded version:

julia> const COUNTER = zeros(UInt, 9);

julia> function mul_svector_instrumented(a::SVector{N,T}, b::SVector{N,T}, c::SVector{N,T}) where {N,T}
           t0 = time_ns()
           ra = Ref(a)
           rb = Ref(b)
           rc = Ref(c)
           rx = Ref{SVector{N,T}}()
           ry = Ref{SVector{N,T}}()
           rz = Ref{SVector{N,T}}()
           t1 = time_ns()
           GC.@preserve ra rb rc rx ry rz begin
               launch_thread_mul_svector!(1, rx, ra)
               t2 = time_ns()
               launch_thread_mul_svector!(2, ry, rb)
               t3 = time_ns()
               launch_thread_mul_svector!(3, rz, rc)
               t4 = time_ns()
               w = muladd.(2.7, a, b)
               t5 = time_ns()
               ThreadingUtilities.__wait(1)
               t6 = time_ns()
               ThreadingUtilities.__wait(2)
               t7 = time_ns()
               ThreadingUtilities.__wait(3)
               t8 = time_ns()
           end
           COUNTER[1] += 1
           COUNTER[2] += t1 - t0
           COUNTER[3] += t2 - t1
           COUNTER[4] += t3 - t2
           COUNTER[5] += t4 - t3
           COUNTER[6] += t5 - t4
           COUNTER[7] += t6 - t5
           COUNTER[8] += t7 - t6
           COUNTER[9] += t8 - t7
           rx[],ry[],rz[],w
       end
mul_svector_instrumented (generic function with 1 method)

julia> @benchmark mul_svector_instrumented($(Ref(a))[], $(Ref(b))[], $(Ref(c))[]) setup=(fill!($COUNTER,0);)
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     593.026 ns (0.00% GC)
  median time:      619.301 ns (0.00% GC)
  mean time:        623.299 ns (0.00% GC)
  maximum time:     866.733 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     191

julia> (COUNTER ./ first(COUNTER))'
1×9 adjoint(::Vector{Float64}) with eltype Float64:
 1.0  21.9738  72.0942  67.8377  140.22  20.2827  67.4555  63.1152  67.9738

Using CheapThreads.jl would tack on an extra 15ns or so for the request/free threads.
The individual job-starts and waits each take 70-140ns on this computer.
Also, 0 allocations because the GC and threads don’t like each other.

4 Likes

:thinking:

11 Likes

Great to hear that you like the continuation and child stealing approaches! Mission accomplished :slight_smile:

Oh, I didn’t remember he was discussing it. Maybe I should rewatch the talk. But I’d say implementing continuation stealing for limited type of computation is actually possible, since I’ve done this in FoldsThreads.jl. The key observation for me was that, if I write my code in continuation-passing style (CPS), continuation stealing is possible, without touching the compiler. What I call chain::Cons{Function} in the package is actually a cactus stack. But making CPS work well in Julia was a bit tricky (trampoline + type stabilization trick).

I’m not working on this myself but there is another research project for integrating OpenCilk scheduler to Julia (but it’s still in somewhat exploration phase so no concrete strategy for how to land it on mainline Julia ATM). Meanwhile, I’m also trying to sneak in a key ingredient for continuation stealing which is task migration: Refetch PTLS via Task struct (preparation for task migration) by tkf · Pull Request #39220 · JuliaLang/julia · GitHub. Task migration is a prerequisite since, if a Julia’s worker thread wants to steal continuation, the task of the continuation has to be migrated to a different OS thread. Not sure there will be continuation stealing but it’s a nice-to-have feature anyway to make any kind of scheduler efficient.

Gotcha, makes sense. Actually, I don’t know if you implemented it already, but, if you want it to return the works that are not scheduled (as you were mentioning “up to X” strategy), I think you can set the success/failure flags atomically and then check them after the “requester” thread run the single chunk of work. It’s then very likely these flags are already set by then so the task scheduling would be much reliable even though you made it failable/lock-free. I think it could be much more robust to challenging situations (many different kinds of tasks flying around) to the parallel scheduler.

BTW, you get the sub-microsecond results it in the benchmark because the all workers are “hot” and essentially just spin a little between the benchmark measurements, right? That is to say, during the benchmark, your function is very unlikely to yield to the scheduler (which, of course, could be useful if the outer loop in “real” application does the similar thing). I’m just trying to understand what’s happening here.

2 Likes

Forgot to answer this, but yeah, you are right, the author has to provide the alternative sequential algorithm. More complicated part is that, there has to be a compiler-user interface for this.

One thing I’ve been wondering is that if we can come up with an interface to tell the compiler that “this variable can be any value so I’ll let pick you up.” If I can mark basesize of the divide-and-conquer (DAC) type of algorithms (including prefix sum) is such a parameter that the compiler can choose, it can pick up infinity (typemax) when it wants to. Since typically the base case of DAC is the sequential algorithm, we can have the sequential prefix sum in the inner loop this way. This sounds theoretically doable since the compiler has the cost model of the thing it is compiling. But I’m not really sure at all how doable it actually is in practical Julia programs.

Yeah, that sounds like a good idea. DAC is the default lowering of cilk_for in OpenCilk.

2 Likes