# ANN: Transducers.jl 0.3. taking "zeros" seriously, type stability improvements, fusible groupby, OnlineStats, "GPU support", and more

Hi, I’m happy to announce that Transducers 0.3 is released. Let me share some highlights.

If you are new to Transducers checkout the last discussion ANN: Transducers.jl, efficient and composable algorithms for map- and reduce-like operations

and maybe my talk from yesterday

and the slide https://tkf.github.io/juliacon2019-transducers/

### Fusible group-by

There are many Julia packages provide a way to fuse group-by and reduction-per-group: `SplitApplyCombine.groupreduce`, `IndexedTables.groupreduce`, `OnlineStatsBase.GroupBy`, etc. However, I couldn’t find a package that can fuse group-by, reduction, and termination. It turned out this is very easy to implement as a transducer. Here is an example. Following snippet tries to find if there is a group with a sum greater than 3 and stop the computation as soon as it is found:

``````julia> result = transduce(
GroupBy(
string,
Map(last) |> Scan(+) |> ReduceIf(x -> x > 3),
),
right,
nothing,
[1, 2, 1, 2, 3],
);

julia> result isa Reduced
true

julia> unreduced(result)
Dict{String,Int64} with 2 entries:
"1" => 2
"2" => 4
``````

The termination can happen inside a group or outside a group. That is to say, it’s possible to construct a transducer to terminate the computation, say, three groups are found.

``````julia> transduce(
GroupBy(identity, Map(identity)) |> ReduceIf(groups -> length(groups) >= 3),
right,
nothing,
1:9,
) |> unreduced
Dict{Int64,Pair{Int64,Int64}} with 3 entries:
2 => 2=>2
3 => 3=>3
1 => 1=>1
``````

Note that the aforementioned group-by functions in other packages are much more optimized than `Transducers.GroupBy`. If you need speed and don’t need termination, using `Transducers.GroupBy` probably is not a good idea at this point.

See more in the manual: https://tkf.github.io/Transducers.jl/v0.3/manual/#Transducers.GroupBy

### Iterator comprehension support

Iterator comprehension can be translated directory to transducers. You don’t need to know how to use transducers to benefit from some speedups:

``````julia> itr = (y for x in 1:1000 for y in 1:x);

julia> @btime foldl(+, eduction(itr))
21.127 μs (1497 allocations: 39.13 KiB)
167167000

julia> @btime sum(itr)
174.544 μs (1 allocation: 16 bytes)
167167000
``````

### OnlineStats support

OnlineStats are the transducers in disguise. Now you can do, e.g., `Transducer(OnlineStats.Mean())`, to use them with transducers.

https://tkf.github.io/Transducers.jl/v0.3/manual/#Transducers.Transducer-Tuple{OnlineStatsBase.OnlineStat}

### “GPU support”

Stateless and non-terminating transducers can be used without Transducers’ custom `foldl`/`reduce`. This means that Transducers.jl now “supports” CuArrays’ `reduce`.

``````using Transducers, CuArrays, CUDAnative
A = CuArray(randn(3, 3))
rf = reducingfunction(Map(CUDAnative.sin) |> Filter(x -> x > 0), +)
reduce(rf, A, init=0.0f0, dims=1)
``````

### Taking “zeros” seriously (internal)

To fully embrace mutate-or-widen approach and get rid of the use of inference API, I had to “take zeros seriously.” This is because unlike `Base.foldl`, Transducers.jl’s `foldl` needs to handle the case that the elements in the container are filtered out (i.e., it must handle “dynamic emptiness”). This excludes the strategy used by `Base.foldl` where the first element from the container is used as the initial value (note: this approach is also not appropriate for “asymmetric reducing functions” like `push!`). For this, I created a small package called InitialValues.jl. It implements a generic “left identity” singleton value `Init(op)` for operator `op`; i.e., `op(Init(op), x) === x` for any `x`. This handles the initialization of the state of the reduction.

I also created BangBang.jl which provides functions such as `push!!`, implementing the mutate-or-widen variants of a subset of `Base` functions.

Combination of these two approaches turned out to be a very powerful way to implement transducers in Julia. At first, I was worried if I can keep the performance advantage of the transducers because using the singleton value `Init(op)` introduces union types. Impressively, Julia compiler can handle many cases, although some transducers are still hard to infer; e.g., `collect(Take(1), 1:1)` cannot be inferred. Fortunately, specifying the initial value explicitly (e.g., `foldl(push!!, Take(3), 1:1; init=Int[])`) fixes the inference most of the times.

Now that Transducers.jl’s main entry points can be inferred by the compiler, if the transducers used are not very complex, it makes it natural to use transducers in performance-sensitive code. (For anyone who read the old manual, it means that the awkward workaround based on `eduction` for the function barrier is now gone.)

39 Likes

This is fantastic work! I also really enjoyed your talk — it was the clearest explanation of the concept that I’ve seen anywhere.

5 Likes

I’m glad that you enjoyed my talk!

@tkf
Have you looked at reducers? They came before transducers and are very similar and in some ways complimentary. Most importantly built for parallel computation. Uses Java’s fork-join mechanism, so not sure if that moves across to julia seamlessly or not.

I used them all time when I wrote clojure code.

Are reducers “eduction with a hole?”; i.e., is `(fn [reducible] (eduction xf reducible))` a reducer for any transducer `xf`? If so, can I claim that Transducers.jl already has reducers because it implements `eduction`?

Transducers.jl has preliminary multi-threading support for `reduce` function. (Note: Clojure’s `reduce` and `fold` are `foldl` and `reduce` in Julia, respectively.) It predates the new task-based multithreading support and just uses `@threads` macro. It would be interesting to switch to the task-based parallelism. IIUC, it can be beneficial, e.g., when filtering is post-composed with rather compute-intensive transducers. In such case, static scheduling like `@threads` is not great because the total computation required for each thread can be very different. It also should help when a transducer or reducing function itself calls `reduce`.

1 Like

The difference is the combine function you provide and that the fold function is neither foldl or foldr, it’ll depends on how it was parallelized.

I’ll just quote:" The combining function (an associative binary fn) must have some ‘identity’ value, a value that, when combined with some X, yields X. 0 is an identity value for +, as is 1 for *. The combining fn must supply an identity value when called with no arguments (as do + and *). It will be called with no arguments to supply a seed for each leaf reduction. There is a fn (called monoid, shh!) to help you build such combining functions.

If no combining fn is supplied, the reducing fn is used. Simple folds look like reduces:

(r/fold + [1 2 3 4])
;=> 10
But by promising less (i.e. not promising stepwise reduction from left or right) fold can do more - run in parallel"

That’s why I said Clojure’s `fold` is Julia’s `reduce`. I think I’m doing more or less the same thing as the blog you quote.

1 Like

I’m traveling right now so can’t give any actual code examples, I remember getting a little creative with the monoid combine function. Possibly what you describe as “eduction with a hole” is the same. https://en.wikipedia.org/wiki/Monoid_(category_theory)

I do miss parallel reduce, so I will be using your package in Julia!

My question about the “eduction with a hole” was about reducers, not reducing step functions (the latter is related to monoids). Actually, I think

The following functions (analogous to the sequence versions) create reducers from a reducible or foldable collection: r/map r/mapcat r/filter r/remove r/flatten r/take-while r/take and r/drop.

in https://clojure.org/reference/reducers answers my question. I was thinking `(fn [reducible] (r/map inc reducible))` was a reducer; but it was not correct. So, I should be asking: Is `(r/map inc [1 1 1 2])` equivalent to `(eduction (map inc) [1 1 1 2])`? If `(eduction (map inc) [1 1 1 2])` is a reducer (or equivalent to it), I’d say Transducers.jl already has reducers.

It’d be great if you can share your usecase. It’s interesting to see if Transducers.jl can support it.

By the way, it’s interesting that you bring up monoids as that’s related to the main improvements in v0.3. In lisps, you mark binary functions as monoids by defining nullary function (e.g., `(+)` to be `0`). IIUC, that’s the case in Clojure, too. However, the same strategy does not work well in Julia/Transducers.jl because:

1. I don’t “own” `+` etc. so defining `Base.+() = 0` will be type-piracy.
2. More importantly, `acc = step(acc, x)` could introduce a `Union` if I initialize `acc` with some arbitrary “zero”. The compiler is very good at resolving this but it’s nicer if there is a way to get rid of this `Union` at the end of `foldl`/`reduce`.
3. In-place `step` function (e.g., `append!`) does not work well with an identity with a fixed type. Semantically, it is correct to use `Any[]` as the identity for `append!`. However, you’d want to avoid boxing elements if possible. I suppose Clojure does not have this problem because it’s immutable by default and has excellent builtin persistent data structures.

To solve points 1 and 2, I wrote InitialValues.jl [1]. Point 3 is solved by BangBang.jl.

[1] Actually, I just realized that the current implementation is not enough for parallel `reduce`. InitialValues.jl only implements left identities. I need to add right identities to make parallel `reduce` work smoothly.

1 Like

Had a chance to look at my reducer code.

For reference: https://clojuredocs.org/clojure.core.reducers

I made extensive use of this pattern.

``````(:require [clojure.core.reducers :as r])
(r/foldcat (r/map f c))
``````

`r/foldcat` is the same as `(r/fold cat append!)` which is a good example of how separating the combining function from the reducing function is benificial. The combining function uses a parallel/lazy friendly immutable operation `cat`, while the reducing function can mutate, `append!` ( grows a vector ).

Here’s an example of me being very explicit about specifying the combining function and reducing function. I think this was one of the first use of reducers I wrote so could be replaced with `foldcat`

``````(defn calc-volume [p tet]
"calculate the volume of a set of tetrahedral elements
tet should be of form [[i00 i01 i02 i03][i10 i11 i12 i13]...]
returns and interleved vector of [volume tet-indices] pairs.
volume is positive, if the input was a negatively wound tet,
it will have a correct +ve sense winding in the output."
(let [reducef (fn [acc [i0 i1 i2 i3 :as itet]]
(let [p0 (p i0)
p1 (p i1)
p2 (p i2)
p3 (p i3)
vol (m/calc-volume p0 p1 p2 p3)]
(if (< vol 0.0)
(conj acc [(* -1.0 vol) [i0 i2 i1 i3]])
(conj acc [vol itet]))))
combinef (r/monoid into (fn [] []))]
(r/fold combinef reducef tet)))

``````

here’s one with max.

``````(r/fold (fn ([] 0) ([a b] (max a b))) max
(r/map f c))
``````

I see how I could use InitialValues.jl for this, but I do really like the conciseness, flexibility and single packageness of the multiple airity lambda. Julia might look like.

``````fold(Monoid(()->0, (acc,el)->max(acc,el)), max, c)
``````

abuse of parallel fold machine! The combining function does nothing and I use mutation in the reducing function, totally ignoring the accumulator. I do something similar in other places for a conjugate gradient solve.

``````(def nil-monoid (r/monoid (fn [a b] nil) (fn [] nil)))
(r/fold nil-monoid
(fn [_ iu] (m/set-row! u iu (m/sub (P iu) (P0 iu))))
``````

one more

`````` (r/fold (fn ([] {}) ([a b] merge-with concat a b))
(partial merge-with concat)
(r/map f c))

``````
1 Like

(you mean `(r/fold cat append! coll)`; sorry for the nitpick)

Thanks, this is an interesting point. If an operation can be optimized if it can know that it is invoked sequentially, it makes sense.

But this imposes that combining function is applied sequentially. If combining function itself is expensive, this may not be optimal. This is why, at the moment, Transducers.jl’s combining functions are invoked in parallel (in `master` branch, with Julia >= 1.3). But it would be nice if there is a flexible way to tell `Transducers.reduce` to invoke certain functions in the main thread sequentially. I think this API can be generalized as a trait-based system to encode algebraic and algorithmic properties of reducing functions and transducers. This can be used to not only optimize the outer most loop (as in `r/fold`) but also all the inner lops (e.g., `Cat()` transducer).

Isn’t it much more concise to write

``````reduce(max, c; init=0)
``````

?

Also, if the collection `c` can contain negative numbers, it’s very dangerous to “package” the initial value 0 with `max`. This is not an accident but based on a mathematical principle. Monoid is not the property of a function [1]. It is a property of a function paired with a set on which it is defined. You can’t know the identity element until you know the input type. For example, it is wrong to define `*() = 1` because `*() = ""` also makes sense. Another example is `intersect`; you likely don’t want to implement `intersect() = SET_OF_EVERYTHING`. There are other concerns including the ones I described above.

Having said that, it is easy to implement `Monoid` helper type on top of InitialValues.jl. I agree it could be handy in some places. But this can be done because (I think) InitialValues.jl’s approach is more powerful than lisp’s nullary function approach.

[1] OK. In math, sometimes functions are “aware” of domain and codomain. I am using function in the sense that the function `identity` can be a function on `Int` and `Float64`.

By the way, I believe Transducers.jl has much nicer way to define `calc-volume`. Given a function `calc_volume(p0, p1, p2, p3) -> volume`, you can do

``````xf = Zip(MapSplat(calc_volume), Map(identity))
collect(xf, tet_indices)
``````

or, if you want to do it in parallel, you can do

``````reduce(append!!, xf |> Map(Tuple), tet_indices; init=Union{}[])
``````

I don’t think it does. The fork-join framework divides into a tree-like execution pattern, if I recall correctly. Reducing with `append!` happens with smaller collections to create the leaves and then `cat` takes advantage of clojures immutable data structures to combine everything up as the tree collapses to the root. You don’t know what order `cat` will combine collections, which is why the operation needs to be associative.

The point here is that I know there will be no negative numbers, so I can define my own application (set) specific monoid. The more concise version you provide has less information ( the specific monoid operating on my set ). My whole argument can be reduced to this: I think decomplecting the combining function and reducing function is more flexible.

Which is why passing your own combining function/Monoid as a parameter that doesn’t rely on dispatch is more flexible.

Thanks, that’s a nice way of doing it. You could do the same in clojure by using clojure’s transducers and then reducing with `r/fold` I believe.

OK. Thanks for the clarification. I was inferring that was the case from your earlier comment.

But my point is still valid. There may be the case where hinting that a function must run sequentially can be beneficial for performance.

`append!` is associative provided that you don’t use the input again (and input type matches).

Same applies to `init=0`. I can specify the concrete identity element because I know the collection does not include negative numbers.

I don’t think so. You only need to know what the identity is. Providing it as a parameter is as informative as calling unary function.

I agree with this. I think Transducers.jl can do better because transducers are ordinary structs in Transducers.jl; not opaque closures where you can’t query the internal properties (see the comment referring to a trait system above).

I don’t think the `Monoid` constructor approach is strictly more flexible because I can implement it on top of InitialValues.jl. In fact, I believe it’s the other way around. See my comment about `intersect` above.

Sorry I was starting to argue my point for the sake of argument. I haven’t really had any experience with Transducers.jl (yet). The breaking apart of combining function and reducing function is really in service to the fork-join framework, so it’s a little implementation specific.

How do you implement parallel reduce in Transducers.jl?

No worries! My apologies as I might sound a bit combative. It’s nice to get feedbacks from Clojure programmer’s perspective.

You don’t need to hurry if your main interest is parallelism because the parallel part is not really explored yet. The thread API in Julia is still just a few weeks old in Julia’s `master` branch. So Transducers.jl does not have much ATM.

Well, I did separate combine function from reducing function in the parallel reduce protocol too, as some transducers need to do something different. But currently, it’s not used much and not documented at all.

The surface API is something like `(r/foldl ((comp (map inc) (filter even?)) +) coll)` instead of `(r/fold + (r/filter even? (r/map inc coll)))` (but you can get something like the latter by using `eduction`).

Internally, I just use a simple divide-and-conquer approach until I hit a big enough base case. The core code is just 8 lines:

IIUC it is efficient because Julia does depth-first scheduling.

By the way, do reducers in Clojure support early termination in `r/fold` via `reduced`? In current `master` of Transducers.jl, using parallel `reduce` with a reducing function that may return a `Reduced` is possible but not deterministic. I think I’ve found a way to make it deterministic. I think it’d be useful for implementing parallel search and I wonder how/if it’s used in Closure.

Nice. I think this is essentially the same as fork-join.

Good question. I’m not sure. I haven’t mixed transducers and reducers in Clojure before.
If your reducing function is associative and deterministic your results should be deterministic, no? Seems like this is why clojure requires the reducing function to be associative. Which also implies a weak form of immutability ( restriction on mutating shared state between chunks ).

Yes, if you sweep all elements in the collection. But `Reduced` means that it’s OK to (or more like should) terminate the reduction. That’s beneficial because you can cut execution time. It is easy to implement it in (non-parallel) `foldl` because there is no ambiguity in the result (even without associativity). However, in parallel `reduce`, you can’t simply terminate at the “first” occurrence of the match (“first” in physical time). This is because it may not be the first element in the collection. So, it is incorrect to terminate all the task in parallel `reduce` just because `Reduced` is returned in one task; you only can terminate the ones “right” to your location in the collection.

I think what I want to demonstrate would be implemented as

``````(require '[clojure.core.reducers :as r])

(defn reduce-if [f]
(fn
([y x]
(cond
(or (reduced? y) (f y)) (ensure-reduced y)
(or (reduced? x) (f x)) (ensure-reduced x)
:else y))
([] 0)))
``````

in Clojure. But it behaves strangely:

``````user=> (let [op (reduce-if odd?)] (r/fold op [0 3 0 1 0]))
3
user=> (let [op (reduce-if odd?)] (r/fold op op [0 3 0 1 0]))
3
user=> (let [op (reduce-if odd?)] (r/fold 5 op op [0 3 0 1 0]))
3
user=> (let [op (reduce-if odd?)] (r/fold 4 op op [0 3 0 1 0]))
#object[clojure.lang.Reduced 0x2d23faef {:status :ready, :val 3}]
``````

I think this indicates that `r/foldl` is unaware of `reduced` because it (presumably) does not handle `reduced` returned from `combinef`.

edit: Fixed `reduce-if`

1 Like

It looks like `reduced` renders the operation non-associative in clojure. Which seems reasonable.
If you can do manage to handle non-associative operators, that’s an improvement.

The fact that `r/fold` returns a `Reduced` is nothing to do with non-associativity. If it has the same semantics as `clojure.core/reduce`, it should unwrap the `Reduced` before returning it. I suspect that it unwraps `Reduced` when `n` is larger than the collection size because it uses `clojure.core/reduce` in the base case.

But you are right that the original `reduce-if` was not associative. I just fixed it. I think it is now equivalent to the `First` monoid in Haskell https://www.schoolofhaskell.com/user/mgsloan/monoids-tour#first-and-last

1 Like

Sure, seems like a bug in the clojure code. Easy enough to fix by wrapping your result with `(unreduced r`) though. Actually this was one of my frustrations with clojure, results of functions were often either wrapped, unwrapped or in a totally different container than I expected.

Should associativity affect the result? Possibly I don’t understand the subtleties of reduced and the implications of results being returned in order vs affects of non-associatively.
Do I understand reduced and associativity correctly? A + B + C is such that (A + B) + C = reduced(A+B) and A + (B + C) = reduced(B+C). Where X + reduced(Y) = reduced(Y). I’m not sure what the result of parallel reduce on A+B+C+D… should be.

1 Like