ANN: Parallel `for` loops in FLoops.jl with composable and extensible fold-based API

A while ago I posted FLoops.jl RFC that demonstrates alternative for loop framework on top of Transducers.jl. FLoops.jl lets users write fast for loops over complex data structures. Since it received quite positive reactions [*1], it made me think that it might be nice to provide a nice syntax sugar that can give people imperative style feeling on top of functional style composable framework. In particular, I’ve been trying to expose the parallel computing part of Transducers.jl via FLoops.jl. I think I finally found a nice syntax and included in the latest version of FLoops.jl. You can install it via using Pkg; Pkg.add("FLoops").

So, here is the syntax I came up. I’m actually interested in if it is readable without looking at the docs. What do you think?

@floop for (i, v) in pairs([0, 1, 3, 2]), (j, w) in pairs([3, 1, 5])
    d = abs(v - w)
    @reduce() do (dmax = -1; d), (imax = 0; i), (jmax = 0; j)
        if isless(dmax, d)
            dmax = d
            imax = i
            jmax = j
        end
    end
end
@show (dmax, imax, jmax)

This is one of the most “complex” examples from the README. I’m kind of hoping that it is actually readable without explaining what the API of the macros are.

For simpler cases, there are OpenMP-like shorthands:

@floop for (x, y) in zip(1:3, 1:2:6)
    a = x + y
    b = x - y
    @reduce(s += a, t += b)
    # @reduce(s = 0 + a, t = 0 + b)  # with explicit initializer
end
@show (s, t)

For more information, have a look at the documentation.

Compared to Threads.@threads, there are a few advantages of @floop:

  • It supports a large set of data collection types with SplittablesBase.jl interface, as listed in SplittablesBase.jl README. This includes lazy iterators like Iterators.filter and Iterators.product. Iterator comprehensions work too.
  • Flexible reduction support with @reduce syntax.
  • Loop execution options such as basesize, simd, and nestlevel for tweaking the performance (see foldxt docstring).
  • Automatic detection of a possible data-race pattern.
  • Control flow supports; i.e., continue, break, return, and @goto work inside the loop body.
  • Pluggable executors (schedulers). @floop currently supports sequential (default when no @reduce), threaded (default with @reduce), and distributed executor backends.
  • Interop with the rest of Transducers.jl-related packages.
  • Adhere to nested and composable parallelism of Julia.

Since @floop is built on top of the extensible fold API of Transducers.jl, it is possible to add, for example, another execution backend. It was quite easy to hack up Dagger.jl-based fold that can be used via Transducers.jl API including @floop: GitHub - JuliaFolds/FoldsDagger.jl. For sequential loops, I’m experimenting easier-to-use interfaces for some casual performance tweaks using SIMD (via SIMD.jl), prefetching, unrolling, etc. in LoopRecipes.jl that can be composed with @floop.

[*1] This is an aside, but, to be honest, the reactions to FLoops.jl were a bit surprising. It was just a bunch of macros to call Transducers.jl API. It does not let you write anything new. But I guess that’s what Julia designers mean by language design is applied psychology. Sometimes syntax sugars do make a rather big difference.

30 Likes

It is indeed easy to understand and very intuitive (I read the code, tried to guess what it does, then the docs and verifyed my guess).

Thanks for this very nice interface! You may be right that it is only a thin layer on Transducers.jl, but the building blocks may be more familiar to most users.

4 Likes

Wow this looks great! I second the need to have a for-like syntax: having a familiar abstraction is a huge help when dealing with complicated applications or for users who are not professional developers (try convincing someone who uses Fortran “because it’s simple” to use transducers…)

I have to say the syntax doesn’t feel very intuitive to me (the @reduce feels a bit too magical so that it’s not clear what it does exactly), but I don’t really have an alternative… Maybe it would help to show in the docs an equivalent with explicit @threads for a simple reduction? (no transducer, just plain old loops and @threads)

Is there any way to support custom mutating reducers? Eg mul!(accum, x, y, true, true)

4 Likes

Is @reduce the only thing required to switch from a serial loop to a multi-threaded loop?

1 Like

@Tamas_Papp @antoine-levitt Thanks for giving a shot at the guessing game!

I think I’m relying too much on the intuition of the reader in the current documentation. That’s a good point that I should add some equivalent code. @threads does not support reduction so I can’t really translate to it. So, I just added some tips for mentally converting it to the plain for loop. What do you think?

When reading code with @reduce() do, a quick way to understand it is to mentally comment out the line with @reduce() do and the corresponding end. To get a full picture, move the initialization parts (in the above example, dmax = -1, imax = 0, and jmax = 0) to outside for loop:

julia> dmax = -1  # -+
       imax = 0   #  | initializers
       jmax = 0   # -+
       for (i, v) in pairs([0, 1, 3, 2]), (j, w) in pairs([3, 1, 5])
           d = abs(v - w)
           if isless(dmax, d)  # -+
               dmax = d        #  | `do` block body
               imax = i        #  |
               jmax = j        #  |
           end                 # -+
       end
       (dmax, imax, jmax)
(5, 1, 3)

This exact transformation is used for defining the sequential basecase. Consecutive basecases are combined using the code in the do block body.

I also added a bit more explanations in the reference FLoops.jl · FLoops

Yes. Alternatively, if you are not doing reduction (like how you’d use @threads), you can specify the “executor” as in

@floop ThreadedEx() for i in eachindex(xs, ys)
    ys[i] = xs[i]
end
3 Likes

I meant how you would implement it using @threads - which is the canonical way to do things without floops, and something not trivial for beginners. Eg for a naive sum with Float64, something like

x_loc = zeros(Threads.nthreads())
@threads for xx in x
    x_loc[Threads.threadid()] += xx
end
s = sum(x_loc)

In particular this highlights that you need nthreads() temporaries, which can be important for large arrays.

1 Like

There are some problems with the approach using nthreads() length buffer [*1]. To be honest it’s hard for me to see why it’d help understanding how @reduce works. I think it’s fair to say it’s not an intuitive solution unless you’ve seen it many times. OTOH, we all understand how sequential for loop works. I thought I can rely on the sequential for loop as a solid reference point.

[*1] The element type has to be specified manually before executing the loop. It limits what you can do. If @threads starts supporting dynamic scheduling, load-balancing, I/O, and/or thread migration, this approach does not have defined behavior. Load/store for each iteration (though maybe the compiler can LICM it especially when there is only one buffer vector?).

In-place reduction is already possible. There is an example using append!!(EmptyVector(), ys) in README. Creating basecase-local buffer is a bit tricky, though (I’m thinking to add some syntax for this). For now it’d require some hacky solution like

@floop for x in [randn(3, 3) for _ in 1:10]
    dummy = nothing
    @reduce() do (a = Matrix{Float64}(I, 3, 3); x), (b = zeros(3, 3); dummy)
        mul!(b, a, x)
        (a, b) = (b, a)
    end
end

Julia v1.5.0 with 4 threads:

julia> using BenchmarkTools, FLoops, Base.Threads

julia> function f(v)
           @floop for x in v
               a = x ^ 2
               @reduce(s += a)
           end
           s
       end
f (generic function with 1 method)

julia> function g(v)
           acc = zeros(Threads.nthreads())
           @threads for x in v
               a = x ^ 2
               acc[Threads.threadid()] += a
           end
           sum(acc)
       end
g (generic function with 1 method)

julia> v = randn(100_000);

julia> f(v) == g(v)
true

julia> @btime f(x) setup = (x = randn(1000_000));
  253.011 μs (131 allocations: 7.16 KiB)

julia> @btime g(x) setup = (x = randn(1000_000));
  689.686 μs (22 allocations: 3.09 KiB)

Am I doing something wrong? f and g give the same result, so I’d assume the answer is “no”. Why is @floop so much faster? It doesn’t create temporary arrays under the hood?

Edit: I don’t think the allocation of the acc vector alone explains the difference, since it’s so small:

julia> function g!(acc, v)
           @threads for x in v
               a = x ^ 2
               acc[Threads.threadid()] += a
           end
           sum(acc)
       end
g! (generic function with 1 method)

julia> acc = zeros(Threads.nthreads());

julia> g!(acc, v) == f(v)
true

julia> @btime g!(acc, x) setup = (acc = zeros(Threads.nthreads()); x = randn(1000_000));
  683.165 μs (21 allocations: 2.98 KiB)

With @inbounds acc[Threads.threadid()] += a, these are comparable for me.

As is @btime sum(z->z^2, x) setup = (x = randn(1000_000));, since I suppose the operation is memory-limited.

1 Like

Dang, right!

1 Like

But don’t you always to have such a buffer? Doesn’t a @floop with a @reduce allocate nthreads() accumulators?

I guess the answer is kinda yes and no? Kinda yes, since obviously I need to store the intermediate result somewhere (especially because inter-task communication in Julia has to go through the heap). But kinda no, as the accumulators are not pre-allocated as a contiguous vector. Rather, a small object is allocated and transferred via fetch(task). So the implementation strategy of @floop’s (or rather Transducers.jl’s) is quite different from pre-Julia 1.3 common approach. This approach lets us avoid the issues I mentioned.

I should say Transducers.jl approach currently has one disadvantage because fetch cannot be inferred at least until we have https://github.com/JuliaLang/julia/issues/35690. But I’m guessing it’s probably possible already to make fetch inferable thanks to @c42f’s _apply_in_world builtin https://github.com/JuliaLang/julia/pull/35844.

Sorry to be dense but just to be precise on this because it matters in some applications : if the reduced object is of size N, does @floop allocate N*nthreads() ram, less or more?

1 Like

No worries, questions are very welcome! :slight_smile:

A short answer is, by default, yes. Here is an N = 3 toy example:

@floop for x in 1:10
    xs = [x, 2x, 3x]
    @reduce() do (ys = zeros(3); xs)
        ys .+= xs
    end
end
@show ys

This allocates nthreads() different ys arrays. So, in total, N*nthreads() items. But it doesn’t have to live in a contiguous region of the RAM.

A longer answer is:

  • You may not be able know the correct element type to be used. In this case, you’d need to widen the accumulator storage type as you go. It’d require you to allocate length N objects more than nthreads() times.

  • @floop also lets you create arbitrary number of accumulators because:

    • It’s useful to create more than nthreads() accumulators. For example, you may want to have load balancing when the computation time for each iteration fluctuates wildly. It also helps when interleaving I/O.

    • It’s also sometimes useful to create less than nthreads() accumulators (so limit the parallelism). For example, if you know there are other tasks that runs in parallel and you know the computation time is somewhat close to the task creation overhead, it makes sense to limit the number of tasks to be used.

I think part of what @tkf is trying to say is that the actual algorithm employed in your example is just different from what Transducers.jl does.

I believe a better model for how this reduction is done (though still very approximate) can be seen by writing a multi-threaded fibbonacci function with Threads.@spawn.

_fib(n) = n < 2 ? n : _fib(n-1) + _fib(n-2)

function fib(n::Int; basesize=23)
    if n > basesize
       t = Threads.@spawn fib(n - 2)
       fib(n - 1) + fetch(t)
    else
       _fib(n)
    end
end

This is like a multi-threaded reduction that keeps splitting into new tasks until it gets to an appropriate base size and then finishes the reduction in serial.

2 Likes

Great work! It look very good.
The API is nice, but I feel that the reduce syntax may be not clear.

@reduce() do (dmax = -1; d)
    ....
    dmax = d
end

it seems strange. First, the -1 (I know it is the initial value, but you only now after read the documentation, it is not intuitive). Second, the repeated use of “d” both in the assignation and in the “header” of reduce. It seems strange and redundant, but I am sure you have your reasons.

However, the syntax:

@reduce(dmax = -1 + d)
# or
@reduce(dmax += d)

is very intuitive in my point of view.

2 Likes

I appreciate your feedback! These are very good points that I should be explaining in the documentation! Yes, I agree it’s strange from the first look and it may not be obvious why the syntax is the way it is. After all, it took me a few months to came up with the syntax that supports advanced use cases that I actually I want to use. (Which could just mean that my imagination is lacking :joy:. But I’m happy with the end result anyway!)

These two seemingly strange syntax choices actually originate from the same requirement.

When you are writing efficient data parallel reduction, you’d have to write the code for sequential base case (that runs for each “thread”) and the code for combining these base cases. Furthermore, you’d usually use the parts of sequential base case code for combining base cases [*1]. So that’s why @reduce() do is in the middle of the loop body. It is used to annotate the region of the code that has to be used for combining the base cases. So, naturally, you can have code outside @reduce (which you call “header”).

Importantly, when extracting out the body of @reduce() do as a function, we need to know what variables are the arguments to this combining function. That’s why you need to list the variables as the arguments to @reduce() do instead of, e.g., a plain begin-end block like

@reduce begin
    if isless(dmax, d)
        dmax = d
        imax = i
        jmax = j
    end
end

For simple case, maybe it is possible to analyze the code and automatically determine the “argument variables.” However, it’d be very tricky (if not impossible) to handle cases like

@floop for x in xs, y in ys
    @reduce begin
        sum += x * y
        prod *= x + y
    end
end

“Repeated variables” syntax for @reduce() do is there to guide users to separate out the combine function definition and coming up with parallelizable algorithm. Once you understand a few concepts, I’m hoping that this would give you a very “thin wrapper” impression and the feeling that you have the full control of the parallel loop (even though the actual AST transformation is rather tedious).

Having said that, it’d be possible to shave of repeated d from the example by allowing something like

@reduce() do (dmax = -1; d = abs(v - w)), (imax = 0; i), (jmax = 0; j)
    if isless(dmax, d)
        dmax = d
        imax = i
        jmax = j
    end
end

The initialization parts like dmax = -1 are in the argument position of @reduce instead of before the for loop because of this reason (repeat less). I’m also hoping that it reminds you that initialization happens multiple times (it happens for each “thread” ≈ base case).

[*1] This is because, equivalently, data parallel reduction should be able to be decomposed into stateless transformations of the iterator and binary operation that is associative (i.e., how the operations are grouped/parenthesized does not patter — this is the most important ingredient of data parallelism). This is how Transducers.jl usually works; i.e., given the iterator transformation and the binary operation, it composes the sequential loop body. FLoops.jl works the other way around. It let users write already-composed loop body with a syntax @reduce() do to annotate which part of the loop body should be extracted out as the associative binary operation.

1 Like

I originally thought we could have an API where the world counter and a function were wrapped up in a type and eventually fed to _apply_in_world which would be understood by inference. But @jameson seemed to think this would be unsound in some way. So for now I kept things simple and didn’t add any inference for _apply_in_world. I’d still like to do it though, or better understand why it’s not possible. There’s a small amount of discussion of this on the PR as well.

3 Likes