**tl;dr** How about adding optimizable task parallel API to Julia?
## Introdu…ction
### How to teach parallelism to the Julia compiler
(If you've already seen @vchuravy's PR #31086, maybe this part is redundant.)
How we currently implement the task parallel API in Julia introduces a couple of obstacles for supporting high-performance parallel programs. In particular, the compiler cannot analyze and optimize the child tasks in the context of the surrounding code. This limits the benefit parallel programs can obtain from existing analysis and optimizations like type inference and constant propagations. Furthermore, the notion of tasks in Julia supports complex concurrent communication mechanisms that imposes a hard limitation for the scheduler to implement an efficient scheduling strategy.
*Tapir* ([Schardl et al., 2019](https://doi.org/10.1145/3365655)) is a parallel IR that can be added to existing IR in the SSA form. They demonstrated that Tapir can be added to LLVM (aka _Tapir/LLVM_; a part of [OpenCilk](https://cilk.mit.edu/)) relatively "easily" and existing programs written in Cilk can benefit from *pre-existing* optimizations such as loop invariant code motion (LICM), common sub-expression elimination (CSE), and others that were *already written for serial programs*. In principle, a similar strategy should work for any existing compiler with SSA IR developed for serial programs, including the Julia compiler. That is to say, with Tapir in the Julia IR, we should be able to unlock the optimizations in Julia for parallel programs.
Tapir enables the parallelism in the compiler by limiting its focus to parallel programs with the *serial-projection property* (@vchuravy and I call this type of parallelism the _may-happen in parallel parallelism_ for clarity). Although this type of parallel programs cannot use unconstrained concurrency communication primitives (e.g., `take!(channel)`), it can be used for a vast majority of the parallel programs that are compute-intensive; i.e., the type of programs for which Julia is already optimized/targeting. Furthermore, having a natural way to express this type of computation can be beneficial not only for the compiler but also for the scheduler.
### A strategy for optimizable task parallel API
This PR implements Tapir in the Julia compiler, in Julia. I've been working with @vchuravy and TB Schardl (@neboat) to extend and complete @vchuravy's PR #31086 that uses OpenCilk (which includes a fork of LLVM and clang) for supporting Tapir at LLVM level (Tapir/LLVM). This project still has to solve many obstacles since extracting out Julia tasks at a late stage of LLVM pass is very hard (you can see my **VERY** work-in-progress fork at https://github.com/cesmix-mit/julia). We observed that many benefits of Tapir can actually be realized at the level of Julia's compilation passes (see below). For example, type inference and constant propagation implemented in the Julia compiler can penetrate through `@spawn` code blocks with Tapir. So, I implemented Tapir in pure Julia and made it work without the dependency on OpenCilk. Since this can be done without taking on a dependency on a non-standard extension to the LLVM compilation pipeline, we think this is a good sweet spot for start adding parallelism support to Julia in a way fully integrated into the compiler.
Although there are more works to be done to turn this into a mergeable/releasable (but experimental) state, I think it'd be beneficial to open a PR at this stage because to start discussing:
* if we want parallelism support integrated into the compiler
* what kind of frontend API we want
* implementation strategy
I'm very interested in receiving feedback!
## Proposed API
Here is an example usage of the Tapir API implemented in this PR for the moment. Following the tradition, it computes the Fibonacci number in parallel:
```julia
using Base.Experimental: Tapir
function fib(N)
if N <= 1
return N
end
local x1, x2 # task output
Tapir.@sync begin # -.
Tapir.@spawn x1 = fib(N - 1) # child task # | syncregion
x2 = fib(N - 2) # |
end # -'
return x1 + x2
end
```
* `Tapir.@sync begin ... end` denotes the _syncregion_ in which Tapir tasks can be spawned.
* `Tapir.@spawn ...` denotes the code block that _may_ run in parallel with respect to other tasks (including the parent task; e.g., `fib(N - 2)` in the example above).
* The _task output_ variables, i.e., the variables that are accessed after the _syncregion_ must be declared with `local`. In the example above, `x1` and `x2` are the task output. Declaring with `local` is required because, just like the standard `@sync`, `Tapir.@sync` creates a scope (i.e., it is a `let` block). Task output variables can also be initialized before `Tapir.@sync`.
* The code written with `Tapir` is expected to have the *serial projection property* (see below for the motivation). In particular, the code must be valid after removing `Tapir`-related syntax (i.e., replacing `Tapir.@sync` and `Tapir.@spawn` with `let` blocks).
* It is undefined behavior (for now [^1]) to assign local variables in one task and read them in another task, since such behavior leads to data races.
* `@goto` into or out of the tasks is not allowed
One of the important consideration is to keep this API very minimal to let us add more optimizations like OpenCilk-based lowering at a later stage of LLVM. I've designed this API first with my experimental branch of OpenCilk-based Tapir support. My OpenCilk-based implementation was not perfect but I think this is reasonably constrained to fully implement it.
Loop-based API: Although OpenCilk and @vchuravy's original PR support parallel `for` loop, I propose to not include it for the initial support in `Base`. On one hand, it is straight forward to implement a simple parallel loop framework from just this API. On the other hand, there are a lot of consideration to be made in the design space if we want to have extensible data parallelism and I feel it's beyond the scope of this PR.
---
[^1]: I think we can make it an error by analyzing this in the front end, in principle.
### Questions
#### More explicit task output declaration?
Current handling of task output variables may be "too automatic" and makes reasoning about the code harder for Julia programmers. For example, consider following code that has no race for now:
```julia
function ...
# ... hundreds of lines (no code mentioning x) ...
Tapir.@sync begin
Tapir.@spawn begin
x = f()
h(x)
end
x = g()
h(x)
end
...
end
```
After sometimes, it may be re-written as
```julia
function ...
if ...
x = ...
return x
end
# ... hundreds of lines (no code mentioning x) ...
Tapir.@sync begin
Tapir.@spawn begin
x = f()
h(x)
end
x = g()
h(x)
end
...
end
```
This code now has a race since `x` is updated by the child and parent tasks. Although this particular example can be rejected by the compiler, it is impossible to do so in general. So, it may be worth considering adding a declaration of task outputs. Maybe something like
```julia
Tapir.@output a b c
Tapir.@sync begin
Tapir.@spawn ...
...
end
```
or
```julia
Tapir.@sync (a, b, c) begin
Tapir.@spawn ...
...
end
```
The compiler can then compare and verify that its understanding of task output variables and the variables specified by the programmer.
It would be even better if we can support a unified syntax like this for all types of tasks (`@async` and `Threads.@spawn`).
#### Bikeshedding the name
On one hand, the name of the module `Base.Experimental.Tapir` is not entirely appropriate since Tapir is the concept for IR and not the user-facing API. On the other hand, we cannot come up with a more appropriate name for this. It is mainly because there is no crisp terminology for this type of parallelism (even though Cilk has been showing the importance of this approach with multiple aspects). Another name could be `Base.Experimental.Parallel`. But "parallel" can also mean distributed or GPU-based parallelism. Providing `@psync` and `@pspawn` macros directly from `Experimental` is another approach.
## Performance improvements
### Demo 1: type inference
The return type of the example `fib` above and the simple threaded mapreduce example `mapfold` implemented in `test/tapir.jl` can be inferred even though they contain `Tapir.@spawn`:
```julia
julia> include("test/tapir.jl");
julia> @code_typed fib(3)
CodeInfo(
...
) => Int64
julia> @code_typed mapfold(x -> isodd(x) ? missing : x, +, 1:3)
CodeInfo(
...
) => Union{Missing, Int64}
```
As we all know, the improvement in type inference drastically change the performance when there is a tight loop following the parallel code (without a function boundary).
### Demo 2: constant propagation
Here is another very minimal example for demonstrating performance benefit we observed. It (naively) computes the average on the sliding window in parallel:
```julia
using Base.Experimental: Tapir
@inline function avgfilter!(ys, xs, N)
@assert axes(ys) == axes(xs)
for offset in firstindex(xs)-1:lastindex(xs)-N
y = zero(eltype(xs))
for k in 1:N
y += @inbounds xs[offset + k]
end
@inbounds ys[offset + 1] = y / N
end
return ys
end
function demo!(ys1, ys2, xs1, xs2)
N = 32
Tapir.@sync begin
Tapir.@spawn avgfilter!(ys1, xs1, N)
avgfilter!(ys2, xs2, N)
end
return ys1, ys2
end
```
For comparison, here are the same code using the current task system (`Threads.@spawn`) and the sequential version.:
```julia
function demo_current!(ys1, ys2, xs1, xs2)
N = 32
@sync begin
Threads.@spawn avgfilter!(ys1, xs1, N)
avgfilter!(ys2, xs2, N)
end
return ys1, ys2
end
function demo_seq!(ys1, ys2, xs1, xs2)
N = 32
avgfilter!(ys1, xs1, N)
avgfilter!(ys2, xs2, N)
return ys1, ys2
end
```
We can then run the benchmarks with
```julia
using BenchmarkTools
suite = BenchmarkGroup()
xs1 = randn(2^20)
ys1 = zero(xs1)
xs2 = randn(length(xs1))
ys2 = zero(xs2)
@assert demo!(zero(ys1), zero(ys2), xs1, xs2) == demo_current!(ys1, ys2, xs1, xs2)
@assert demo!(zero(ys1), zero(ys2), xs1, xs2) == demo_seq!(ys1, ys2, xs1, xs2)
suite["tapir"] = @benchmarkable demo!($ys1, $ys2, $xs1, $xs2)
suite["current"] = @benchmarkable demo_current!($ys1, $ys2, $xs1, $xs2)
suite["seq"] = @benchmarkable demo_seq!($ys1, $ys2, $xs1, $xs2)
results = run(suite, verbose = true)
```
With `julia` started with a single thread [^2], we get
```
3-element BenchmarkTools.BenchmarkGroup:
tags: []
"tapir" => Trial(5.614 ms)
"seq" => Trial(5.588 ms)
"current" => Trial(23.537 ms)
```
i.e., Tapir and sequential programs have identical performance while the performance of the code written with `Threads.@spawn` (current) is much worse. This is because Julia can propagate the constant across task boundaries with Tapir. Subsequently, since LLVM can see that the innermost loop has a fixed loop count, the loop can be unrolled and vectorized.
It can be observed by introspecting the generated code:
```
julia> @code_typed demo!(ys1, ys2, xs1, xs2)
CodeInfo(
1 ── %1 = $(Expr(:syncregion))
│ %2 = (Base.Tapir.taskgroup)()::Channel{Any}
│ %3 = $(Expr(:new_opaque_closure, Tuple{}, false, Union{}, Any, opaque closure @0x00007fec1d9ce5e0 in Main, Core.Argument(2), Core.Ar
gument(4), :(%1)))::Any
...
) => Nothing
julia> m = Base.unsafe_pointer_to_objref(Base.reinterpret(Ptr{Cvoid}, 0x00007fec1d9ce5e0))
opaque closure @0x00007fec1d9ce5e0 in Main
julia> Base.uncompressed_ir(m)
CodeInfo(
...
│ ││┌ @ range.jl:740 within `iterate'
│ │││┌ @ promotion.jl:409 within `=='
│ ││││ %56 = (%51 === 32)::Bool
│ │││└
└
...
```
i.e., `N = 32` is successfully propagated. On the other hand, in the current task system:
```
julia> @code_typed demo_current!(ys1, ys2, xs1, xs2)
CodeInfo(
...
│ %13 = π (32, Core.Const(32))
│ %14 = %new(var"#14#15"{Vector{Float64}, Vector{Float64}, Int64}, ys1, xs1, %13)::Core.PartialStruct(var"#14#15"{Vector{Float64}, Vec
tor{Float64}, Int64}, Any[Vector{Float64}, Vector{Float64}, Core.Const(32)])
```
i.e., `N = 32` (`%32`) is captured as an `Int64`.
Indeed, the performance of the sequential parts of the code is crucial for observing the speedup. With `julia -t2`, we see:
```
3-element BenchmarkTools.BenchmarkGroup:
tags: []
"tapir" => Trial(2.799 ms)
"seq" => Trial(5.583 ms)
"current" => Trial(20.767 ms)
```
I think an important aspect of this example is that even a "little bit" of compiler optimizations enabled on the Julia side can be enough for triggering optimizations on the LLVM side yielding a substantial effect.
---
[^2]: Since this PR is mainly about compiler optimization and not about the scheduler, single-thread performance compared with sequential program is more informative than multi-thread performance.
### Demo 3: dead code elimination
The optimizations that can be done with forward analysis such as type inference and constant propagation are probably implementable for the current threading system in Julia with reasonable amount of effort. However, optimizations such as dead code elimination (DCE) that require backward analysis may be significantly more challenging given unconstrained concurrency of Julia's `Task`. In contrast, enabling Tapir at Julia IR level automatically triggers Julia's DCE (which, in turn, can trigger LLVM's DCE):
```julia
using Base.Experimental: Tapir
@inline function eliminatable_computation(xs)
a = typemax(UInt)
b = 0
for x in xs
a = (typemax(a) + a) ÷ ifelse(x == 0, 1, x) # arbitrary computation that LLVM can eliminate
b += x
end
return (a, b)
end
function demo_dce()
local b1, b2
Tapir.@sync begin
Tapir.@spawn begin
a1, b1 = eliminatable_computation(UInt(1):UInt(33554432))
end
a2, b2 = eliminatable_computation(UInt(3):UInt(33554432))
end
return b1 + b2 # a1 and a2 not used
end
```
In single thread `julia`, this takes 30 ms while an equivalent code with current threading system (i.e., replace `Tapir.` with `Threads.`) takes 250 ms.
Note that Julia only has to eliminate `b1` and not the code inside `eliminatable_computation`. The rest of DCE can happen inside LLVM (which does not have to understand Julia's parallelism).
## Motivation behind the restricted task semantics
As explained in the proposed API, the serial projection property restrict the set of programs expressible with `Tapir`. Although we have some degree of interoperability (see below), reasoning and guaranteeing forward progress are easy only when the programmer stick with the existing patterns. In particular, it means that we will not be able to drop `Threads.@spawn` or `@async` for expressing programs with unconstrained concurrency. However, this restricted semantics is still enough for expressing compute-oriented programs and allows more optimizations in the compiler and the scheduler.
As shown above, enabling Tapir at Julia level already unlocks some appealing set of optimizations for parallel programs. The serial projection property is useful for supporting optimizations that require backward analysis such as DCE in a straightforward manner. Once we manage to implement a proper OpenCilk integration, we expect to see the improvements from much richer set of existing optimizations at the LLVM level. This would have multiplicative effects when more LLVM-side optimizations are enabled (e.g., #36832). Furthermore, there is ongoing research on more cleverly using the parallel IR for going beyond unlocking pre-existing optimizations. For example, fusing arbitrary multiple `Task`s to be executed in a single `Task` can introduce deadlock. However, this is not the case in Tapir tasks thanks to the serial projection property. This, in turn, let us implement more optimizations inside the compiler such as a task coalescing pass that is aware of a downstream vecotrizer pass. In addition to optimizing user programs, this type of task improves productivity tools such as race detector (ref [productivity tools provided by OpenCilk](https://cilk.mit.edu/tools/)).
In addition to the performance improvements on the side of the compiler, may-happen in parallel parallelism can help the parallel task runtime to handle the task scheduling cleverly. For example, having parallel IR makes it easy to implement continuation-stealing as used in Cilk (although it may not be compatible with the depth-first approach in Julia). Another possibility is to use a version of `schedule` that _may fail_ to reduce contention when there are a lot of small tasks ([as I discussed here](https://discourse.julialang.org/t/ann-foldsthreads-jl-a-zoo-of-pluggable-thread-based-data-parallel-execution-mechanisms/54662/5)). This is possible because we don't allow concurrency primitives in Tapir tasks and the task is not guaranteed to be executed in a dedicated `Task`. Since Tapir makes the call/task tree analyzable by the compiler, it may also help improving the depth-first scheduler.
## Composability with existing task system
If we are going to have two notions of parallel tasks, it is important that the two systems can work together seamlessly. Of course, since the Tapir tasks cannot use arbitrary concurrency APIs, we can't support some task APIs inside `Tapir.@spawn` (e.g., `take!(channel)`). However, Tapir only requires the forward progress guarantee to be independent of other tasks within the same syncregion but not with respect to the code outside. Simplify put, we can invoke concurrency API as long as it does not communicate with other Tapir tasks in the same syncregion. For example, following code is valid:
```julia
@sync begin
Threads.@spawn begin
Tapir.@sync begin
Tapir.@spawn begin
put!(bounded_channel, 0) # Thunk 1
end
f() # Thunk 2
end
end
Threads.@spawn take!(bounded_channel) # Thunk 3
end
```
as long as `f()` does not use the `bounded_channel`. That is to say, the author of this code guarantees the forward progress of Thunk 1 and Thunk 2 independent of each other but _not_ with respect to Thunk 3. Therefore, the example above is a valid program.
Another class of useful composable idiom is the use of concurrency API that unconditionally guarantees forward progress. For example, `put!(unbounded_channel, item)` can make forward progress independent of other tasks (but this is not true for `take!`). This is also true for `schedule(::Task)`. Thus, invoking `Threads.@spawn` inside `Tapir.@sync` is valid if (bot not only if [^3]) we do not invoke `wait(task)` in the `Tapir.@sync`. For example, the following code is valid
```julia
unbuffered_channel = Channel(0)
@sync begin
Tapir.@sync begin
Tapir.@spawn begin
t1 = Threads.@spawn put!(unbuffered_channel, 0)
# wait(t1)
end
t2 = Threads.@spawn take!(unbuffered_channel)
# wait(t2)
end
end
```
However, it is invalid if `wait(t1)` and `wait(t2)` are uncommented.
---
[^3]: Note that invoking `wait` in a Tapir task can be valid in some cases. For example, if it is known that the set of the tasks spawned by `Threads.@spawn` eventually terminate independent of any forward progress in other Tapir task in the same syncregion, it is valid to `wait` on these tasks. For example, it is valid:
```julia
unbuffered_channel = Channel(0)
Tapir.@sync begin
Tapir.@spawn begin
@sync begin
Threads.@spawn put!(unbuffered_channel, 0)
take!(unbuffered_channel)
end
end
do_something_else() # does not touch `unbuffered_channel`
end
```
However, this is not an example of the idiom of using API that unconditionally guarantees forward progress.
## Implementation strategy
### Outlining
At the end of Julia's optimization passes (in `run_passes`) the child tasks are outlined into *opaque closures* and wrapped into a `Task` (see `lower_tapir!(ir::IRCode)` function). (Aside: @Keno's opaque closure was *very* useful for implementing outlining at a late stage! Actually, @vchuravy has been suggesting opaque closure would be the enabler for this type of transformation since the beginning of the project. But it's nice to see how things fit together in a real code.) The outlined tasks are spawned and synced using the helper functions defined in `Base.Tapir`.
### Questions
* We need to create a new closure at the end of Julia's optimization phase. I'm using `jl_new_code_info_uninit` and `jl_make_opaque_closure_method` for this. Are they allowed to be invoked in this part of the compiler?
* In this PR, task outputs are handled by manual ad-hoc reg2mem-like passes (which is not very ideal). Currently (Julia 1.7-DEV), it looks like [left over slots are rejected by the verifier](https://github.com/JuliaLang/julia/blob/3129a5bef56bb7216024ae606c02b413b00990e3/base/compiler/ssair/verify.jl#L47-L48). Would it make sense to allow slots in the SSA IR? Can code other than Tapir make use of it? For example, maybe slots can work as an alternative to `Core.Box` when combined with opaque closure?
## Acknowledgment
Many thanks to @vchuravy for pre-reviewing the PR!
## TODOs
To reviewers: please feel free to add new ones or move the things in wishlist to here
- [ ] Decide the API
- [x] Pass tests of existing packages ([TapirBenchmarks.jl](https://github.com/cesmix-mit/TapirBenchmarks.jl) and [FoldsTapir.jl](https://github.com/JuliaFolds/FoldsTapir.jl))
- [ ] Resolve all TODO/ASK comments in the code
## Wishlist
This is a list of nice-to-have things that are maybe not strictly required.
- IR verifier for checking the invariance of Tapir (e.g., disallow `@goto` into a task)
- Proper data flow analysis integration (do we need to use slot in the optimizer?)
- Support nested syncregion (or at least error out in the frontend?)
- Refine exception handling (esp. in the continuation)
- More tests
- Documentation (docstrings and `manual/multi-threading.md`)
- Easy sub-CFG-to-opaque-closure interface? (useful for outlining of exceptions for GPUs)