[ANN] FoldsThreads.jl: A zoo of pluggable thread-based data-parallel execution mechanisms

Data parallelism is a very useful approach to parallelism because we can describe what to compute rather than how to compute. It’s especially nice for parallel computing since specifying how to compute often leads to subtle concurrency bugs. However, each user’s each program might have specific requirements for scheduling to obtain decent performance. In particular, it is challenging to create a single scheduler that can manage very imbalanced load, terminatable reduction, resource requirements, backpressure, and/or preserving the primary thread for latency-oriented tasks.

FoldsThreads.jl tries to solve this problem by providing various thread-based data-parallel execution mechanisms with different performance characteristics. It can be used for any JuliaFolds/*.jl packages such as FLoops.jl, Transducers.jl, and Folds.jl (ref [ANN] Folds.jl: threaded, distributed, and GPU-based high-level data-parallel interface for Julia). The aim is to make it possible to specify the execution mechanism after you have described what to compute. This way, you can improve the performance of your parallel code by just swapping the executor passed to the data parallel API. Quoting the README

                                  Executors
                           ,----------------------.
     Algorithms            |    FoldsThreads.jl    |         Data structures
,------------------.       |-----------------------|       ,-----------------.
|  FLoops,         |       |  ThreadedEx*          |       |  Array,         |
|  Folds,          |       |  WorkStealingEx,      |       |  Tables,        |
|  Transducers,    |  ---  |  DepthFirstEx,        |  ---  |  FGenerators,   |
|  OnlineStats,    |       |  TaskPoolEx,          |       |  Dict,          |
|  DataTools, ...  '       |  NondeterministicEx,  |       |  Set, ...       |
`------------------'       |  ...                  |       `-----------------'
                           `-----------------------'

(* ThreadedEx is the default executor provided by Transducers.jl)

  • WorkStealingEx implements work stealing (continuation stealing). Useful for load-balancing.
  • DepthFirstEx implements depth-first scheduling. Useful for findfirst-type computations.
  • TaskPoolEx: Task pool executor. Useful for fine execution control (e.g., back pressure and “background” threads).
  • NondeterministicEx: An executor for parallelizing computations with non-parallelizable iterators.

These executors can be passed to executor of @floop executor for and the last argument of Folds.jl API (e.g., Folds.map(f, xs, executor). See more information in the documentation.

Benchmarks

I ran a few benchmarks. I hope these results can give you some idea for when to use each executor.

Work stealing is useful for load-balancing

I benchmarked

xs = 1:2^13
Folds.sum($f, xs, $Executor(basesize = 1))

where f spins for 100 μs for nworks items (i.e., every length(xs) ÷ nworks) in the input collection xs. This models parallel reduce with a wildly skewed run-time distribution. I compared the default ThreadedEx executor and WorkStealingEx executor.

image

WorkStealingEx performs better than ThreadedEx if the run-time distribution is unbalanced enough. Furthermore, the run-time of WorkStealingEx is much more consistent than ThreadedEx.

Depth-first scheduling is useful for low-latency of findfirst “hit case”

I benchmarked finrfirst with the needle at different places:

xs = rand(2^23)
xs[needleloc] = 2
@benchmarkable(Folds.findfirst(>(1), xs, $Executor(basesize = $basesize)))

NOTE: x axis needleloc is logarithmic; i.e., it magnifies the performance benefits of DepthFirstEx.

DepthFirstEx is useful when the latency of the “hit case” is important and/or the needle is expected to be somewhere at the beginning of the collection. Otherwise, ThreadedEx (default) is a good option. For consistent latency, WorkStealingEx may be useful.

Also, note that the speedup is much more drastic when the median (left) is compared. It means that DepthFirstEx behaves much more consistently and performs better. This is probably due to the randomized scheduling nature of Julia’s task runtime.

Random thoughts

  • The implementation of WorkStealingEx is rather crazy and I’m a bit scared of releasing it. Sorry if it breaks your code :laughing:. Please switch back to ThreadedEx and it’d be great if you can come up with a minimal reproducible example!

  • My impression on playing with different schedulers is that, even though people are commenting on the overhead of @spawn, it is very fast (the fastest?) concurrency mechanism compared to other concurrency primitives in Julia. It means that writing a specific scheduler for improving latency is rather hopeless (expected?). However, I think improving throughput is possible (e.g., WorkStealingEx) by exploiting some properties that the default scheduler cannot assume and there are more interesting things we can do. For example, I made the result of WorkStealingEx deterministic by enforcing the shape of the task tree to be static (although the scheduling itself is dynamic). We can improve the pressure to GC and maybe also the throughput either by imposing the exact associativity or not requiring the deterministic result.

  • Although I’ve been “complaining” that using the custom scheduler in library packages is not a good practice since it reduces composability of parallel programs (e.g., Overhead of `Threads.@threads` - #16 by tkf and Automatically fusing together several for-loops - #18 by tkf), I do understand that approaches like ThreadPools.jl to execute tasks only in “background” threads is sometimes required. That’s why I added TaskPoolEx which is based on the trick I learned from ThreadPools.jl. I think it’s nice that packages written based on JuliaFolds/*.jl can be mixed with latency-oriented code by just passing around the executor object.

  • Most of these executors can be extracted as a slightly generalized version of divide-and-conquer algorithmic skeleton (aka parallel skeleton. It’d then let us write, e.g., mergesort and quicksort based on different executors. I’m thinking to do this after adding a few more executors for clarifying the API we need.

47 Likes

The approach taken in ThreadingUltilies.jl has much lower latency. I’m working on a more convenient interface composable with itself (but not with @spawn), and should have something within the next few weeks. In the mean time, people are free to test/benchmark Octavian.matmul! and LoopVectorization.vmapt! (make sure you’re on a recent version of LoopVectorization, older versions used @spawn or @threads), which use it.

2 Likes

You are right. I should have said cooperative (i.e., composable/friendly) concurrency mechanism. You can spin and that’d be fast.

I’m not yet convinced that ThreadingUltilies.jl’s approach is composable and adequate for the use at library-level. But I think it might be a good idea to wrap ThreadingUltilies.jl or using a similar technique to provide low-latency executor. Since I usually assume “may-happen” parallelism for the data-parallel code, I think I can use ThreadingUltilies.jl without a deadlock (by checking/acquiring the workers in a lock-free manner).

2 Likes

FWIW, it only spins briefly. They should call wait() after spinning for a few milliseconds without being assigned work. So it won’t hog tasks/CPU when you’re not using it.
Of course, this also means if you’re using them less frequently than this the overhead will be much higher, closer to @spawn.

I’m writing a simple scheduler for it that will be composable with itself, and will switch Octavian and LoopVectorization.vmapt! to use it.
That way if you nest multiple levels of ServiceSolicitation.batch (not on GitHub yet), and also call Octavian.matmul and LoopVectorization.vmapt! from within it, the code will work fine and only schedule on unused pools.

Could you describe this/what you have in mind here?
You can check to at least make sure you’re not trying to wake a pool that’s on the same thread as the running task, but what if a different task is running on that thread?
It shouldn’t deadlock, at least, but it could be a while until that thread is run.
But that’s the same sort of problem faced with static schedulers in general.

But that’s why it’s nicer for the scheduler to be aware of all the threads that are active and available.

3 Likes

That’s sound awesome! I’m looking forward to learn the solution you come up with.

I’m thinking to execute code like this

@sync begin
    @spawn f()
    g()
end

as

started = Threads.Atomic{Bool}(false)
function wrapper()
    if !Threads.atomic_xchg!(started, true)
        f()
    end
end
token = maybe_schedule(wrapper)
g()
if Threads.atomic_xchg!(started, true)
    wait(token)
else
    # We just "stole back" the child task f. So executing it here:
    f()
end

maybe_schedule would be something that I’d write by, e.g., wrapping ThreadingUltilies.jl and using it in a (somehow) lock-free manner. Importantly, maybe_schedule(wrapper) is OK to fail since we make sure that f would be executed.

Side note: This is not doable for arbitrary f and g because generic tasks in Julia are allowed to communicate. That is to say,

ch = Channel(0)
f() = put!(ch, 1)
g() = take!(ch)

will deadlock unless I make sure f is scheduled via Julia’s task scheduler. This is what I mean by may-happen parallelism.

Is it possible to check the availability concurrently without it being the bottleneck when processing many tasks?

2 Likes

Awesome stuff, @tkf !

1 Like

The idea of being able to easily switch between different Executors sounds awesome! I have a question: how do the different Executors play with @spawn? Can the Executors execute functions that use @spawn? Can Executors be called from within a @spawn? Do all the Executors use the depth first approach of partr, or only DepthFirstEx does that?

Yeah, using @spawn inside the functions passed to the threaded higher-order functions should be pretty safe. If you are doing something tricky, it can increase the danger of introducing communications between iterations that result in a deadlock, though. Calling the executor from @spawn should be fine, too. Most of the schedulers use @spawn anyway (and then try to be a bit clever inside the task). From the point of view of julia runtime, it’s just a plain task (except TaskPoolEx).

IIUC, partr is not yet depth-first and the plan was to implement approximated depth-first. DepthFirstEx I implemented is very simple-minded approach that serializes the scheduling (but slightly cleverly to start base-case computations as soon as possible). It’s in a way “exact” depth-first.

1 Like

Thank you for the clarification!

I was no aware of that. Is this documented or discussed somewhere?

I’ve heard this from @vchuravy.

1 Like