ANN: Transducers.jl 0.3. taking "zeros" seriously, type stability improvements, fusible groupby, OnlineStats, "GPU support", and more

Hi, I’m happy to announce that Transducers 0.3 is released. Let me share some highlights.

If you are new to Transducers checkout the last discussion ANN: Transducers.jl, efficient and composable algorithms for map- and reduce-like operations

and maybe my talk from yesterday

and the slide https://tkf.github.io/juliacon2019-transducers/

Fusible group-by

There are many Julia packages provide a way to fuse group-by and reduction-per-group: SplitApplyCombine.groupreduce, IndexedTables.groupreduce, OnlineStatsBase.GroupBy, etc. However, I couldn’t find a package that can fuse group-by, reduction, and termination. It turned out this is very easy to implement as a transducer. Here is an example. Following snippet tries to find if there is a group with a sum greater than 3 and stop the computation as soon as it is found:

julia> result = transduce(
           GroupBy(
               string,
               Map(last) |> Scan(+) |> ReduceIf(x -> x > 3),
           ),
           right,
           nothing,
           [1, 2, 1, 2, 3],
       );

julia> result isa Reduced
true

julia> unreduced(result)
Dict{String,Int64} with 2 entries:
  "1" => 2
  "2" => 4

The termination can happen inside a group or outside a group. That is to say, it’s possible to construct a transducer to terminate the computation, say, three groups are found.

julia> transduce(
           GroupBy(identity, Map(identity)) |> ReduceIf(groups -> length(groups) >= 3),
           right,
           nothing,
           1:9,
       ) |> unreduced
Dict{Int64,Pair{Int64,Int64}} with 3 entries:
  2 => 2=>2
  3 => 3=>3
  1 => 1=>1

Note that the aforementioned group-by functions in other packages are much more optimized than Transducers.GroupBy. If you need speed and don’t need termination, using Transducers.GroupBy probably is not a good idea at this point.

See more in the manual: https://tkf.github.io/Transducers.jl/v0.3/manual/#Transducers.GroupBy

Iterator comprehension support

Iterator comprehension can be translated directory to transducers. You don’t need to know how to use transducers to benefit from some speedups:

julia> itr = (y for x in 1:1000 for y in 1:x);

julia> @btime foldl(+, eduction(itr))
  21.127 μs (1497 allocations: 39.13 KiB)
167167000

julia> @btime sum(itr)
  174.544 μs (1 allocation: 16 bytes)
167167000

See also: https://tkf.github.io/Transducers.jl/v0.3/manual/#Transducers.eduction

OnlineStats support

OnlineStats are the transducers in disguise. Now you can do, e.g., Transducer(OnlineStats.Mean()), to use them with transducers.

https://tkf.github.io/Transducers.jl/v0.3/manual/#Transducers.Transducer-Tuple{OnlineStatsBase.OnlineStat}

“GPU support”

Stateless and non-terminating transducers can be used without Transducers’ custom foldl/reduce. This means that Transducers.jl now “supports” CuArrays’ reduce.

using Transducers, CuArrays, CUDAnative
A = CuArray(randn(3, 3))
rf = reducingfunction(Map(CUDAnative.sin) |> Filter(x -> x > 0), +)
reduce(rf, A, init=0.0f0, dims=1)

Taking “zeros” seriously (internal)

To fully embrace mutate-or-widen approach and get rid of the use of inference API, I had to “take zeros seriously.” This is because unlike Base.foldl, Transducers.jl’s foldl needs to handle the case that the elements in the container are filtered out (i.e., it must handle “dynamic emptiness”). This excludes the strategy used by Base.foldl where the first element from the container is used as the initial value (note: this approach is also not appropriate for “asymmetric reducing functions” like push!). For this, I created a small package called InitialValues.jl. It implements a generic “left identity” singleton value Init(op) for operator op; i.e., op(Init(op), x) === x for any x. This handles the initialization of the state of the reduction.

I also created BangBang.jl which provides functions such as push!!, implementing the mutate-or-widen variants of a subset of Base functions.

Combination of these two approaches turned out to be a very powerful way to implement transducers in Julia. At first, I was worried if I can keep the performance advantage of the transducers because using the singleton value Init(op) introduces union types. Impressively, Julia compiler can handle many cases, although some transducers are still hard to infer; e.g., collect(Take(1), 1:1) cannot be inferred. Fortunately, specifying the initial value explicitly (e.g., foldl(push!!, Take(3), 1:1; init=Int[])) fixes the inference most of the times.

Now that Transducers.jl’s main entry points can be inferred by the compiler, if the transducers used are not very complex, it makes it natural to use transducers in performance-sensitive code. (For anyone who read the old manual, it means that the awkward workaround based on eduction for the function barrier is now gone.)

34 Likes

This is fantastic work! I also really enjoyed your talk — it was the clearest explanation of the concept that I’ve seen anywhere.

5 Likes

I’m glad that you enjoyed my talk!

@tkf
Have you looked at reducers? They came before transducers and are very similar and in some ways complimentary. Most importantly built for parallel computation. Uses Java’s fork-join mechanism, so not sure if that moves across to julia seamlessly or not.

I used them all time when I wrote clojure code.

https://clojure.org/reference/reducers
https://clojure.org/news/2012/05/08/reducers

Are reducers “eduction with a hole?”; i.e., is (fn [reducible] (eduction xf reducible)) a reducer for any transducer xf? If so, can I claim that Transducers.jl already has reducers because it implements eduction?

Transducers.jl has preliminary multi-threading support for reduce function. (Note: Clojure’s reduce and fold are foldl and reduce in Julia, respectively.) It predates the new task-based multithreading support and just uses @threads macro. It would be interesting to switch to the task-based parallelism. IIUC, it can be beneficial, e.g., when filtering is post-composed with rather compute-intensive transducers. In such case, static scheduling like @threads is not great because the total computation required for each thread can be very different. It also should help when a transducer or reducing function itself calls reduce.

1 Like

The difference is the combine function you provide and that the fold function is neither foldl or foldr, it’ll depends on how it was parallelized.

I’ll just quote:" The combining function (an associative binary fn) must have some ‘identity’ value, a value that, when combined with some X, yields X. 0 is an identity value for +, as is 1 for *. The combining fn must supply an identity value when called with no arguments (as do + and *). It will be called with no arguments to supply a seed for each leaf reduction. There is a fn (called monoid, shh!) to help you build such combining functions.

If no combining fn is supplied, the reducing fn is used. Simple folds look like reduces:

(r/fold + [1 2 3 4])
;=> 10
But by promising less (i.e. not promising stepwise reduction from left or right) fold can do more - run in parallel"

That’s why I said Clojure’s fold is Julia’s reduce. I think I’m doing more or less the same thing as the blog you quote.

1 Like

I’m traveling right now so can’t give any actual code examples, I remember getting a little creative with the monoid combine function. Possibly what you describe as “eduction with a hole” is the same. https://en.wikipedia.org/wiki/Monoid_(category_theory)

I do miss parallel reduce, so I will be using your package in Julia!

My question about the “eduction with a hole” was about reducers, not reducing step functions (the latter is related to monoids). Actually, I think

The following functions (analogous to the sequence versions) create reducers from a reducible or foldable collection: r/map r/mapcat r/filter r/remove r/flatten r/take-while r/take and r/drop.

in https://clojure.org/reference/reducers answers my question. I was thinking (fn [reducible] (r/map inc reducible)) was a reducer; but it was not correct. So, I should be asking: Is (r/map inc [1 1 1 2]) equivalent to (eduction (map inc) [1 1 1 2])? If (eduction (map inc) [1 1 1 2]) is a reducer (or equivalent to it), I’d say Transducers.jl already has reducers.

It’d be great if you can share your usecase. It’s interesting to see if Transducers.jl can support it.

By the way, it’s interesting that you bring up monoids as that’s related to the main improvements in v0.3. In lisps, you mark binary functions as monoids by defining nullary function (e.g., (+) to be 0). IIUC, that’s the case in Clojure, too. However, the same strategy does not work well in Julia/Transducers.jl because:

  1. I don’t “own” + etc. so defining Base.+() = 0 will be type-piracy.
  2. More importantly, acc = step(acc, x) could introduce a Union if I initialize acc with some arbitrary “zero”. The compiler is very good at resolving this but it’s nicer if there is a way to get rid of this Union at the end of foldl/reduce.
  3. In-place step function (e.g., append!) does not work well with an identity with a fixed type. Semantically, it is correct to use Any[] as the identity for append!. However, you’d want to avoid boxing elements if possible. I suppose Clojure does not have this problem because it’s immutable by default and has excellent builtin persistent data structures.

To solve points 1 and 2, I wrote InitialValues.jl [1]. Point 3 is solved by BangBang.jl.

[1] Actually, I just realized that the current implementation is not enough for parallel reduce. InitialValues.jl only implements left identities. I need to add right identities to make parallel reduce work smoothly.

1 Like

Had a chance to look at my reducer code.

For reference: https://clojuredocs.org/clojure.core.reducers

I made extensive use of this pattern.

(:require [clojure.core.reducers :as r])
(r/foldcat (r/map f c))

r/foldcat is the same as (r/fold cat append!) which is a good example of how separating the combining function from the reducing function is benificial. The combining function uses a parallel/lazy friendly immutable operation cat, while the reducing function can mutate, append! ( grows a vector ).

Here’s an example of me being very explicit about specifying the combining function and reducing function. I think this was one of the first use of reducers I wrote so could be replaced with foldcat

(defn calc-volume [p tet]
  "calculate the volume of a set of tetrahedral elements
  tet should be of form [[i00 i01 i02 i03][i10 i11 i12 i13]...]
  returns and interleved vector of [volume tet-indices] pairs.
  volume is positive, if the input was a negatively wound tet, 
  it will have a correct +ve sense winding in the output."
  (let [reducef (fn [acc [i0 i1 i2 i3 :as itet]]
                  (let [p0 (p i0)
                        p1 (p i1)
                        p2 (p i2)
                        p3 (p i3)
                        vol (m/calc-volume p0 p1 p2 p3)]
                    (if (< vol 0.0)
                      (conj acc [(* -1.0 vol) [i0 i2 i1 i3]])
                      (conj acc [vol itet]))))
        combinef (r/monoid into (fn [] []))]
    (r/fold combinef reducef tet)))

here’s one with max.

(r/fold (fn ([] 0) ([a b] (max a b))) max
          (r/map f c))

I see how I could use InitialValues.jl for this, but I do really like the conciseness, flexibility and single packageness of the multiple airity lambda. Julia might look like.

fold(Monoid(()->0, (acc,el)->max(acc,el)), max, c)

abuse of parallel fold machine! The combining function does nothing and I use mutation in the reducing function, totally ignoring the accumulator. I do something similar in other places for a conjugate gradient solve.

(def nil-monoid (r/monoid (fn [a b] nil) (fn [] nil)))
(r/fold nil-monoid
            (fn [_ iu] (m/set-row! u iu (m/sub (P iu) (P0 iu))))

one more

 (r/fold (fn ([] {}) ([a b] merge-with concat a b))
            (partial merge-with concat)
            (r/map f c))

(you mean (r/fold cat append! coll); sorry for the nitpick)

Thanks, this is an interesting point. If an operation can be optimized if it can know that it is invoked sequentially, it makes sense.

But this imposes that combining function is applied sequentially. If combining function itself is expensive, this may not be optimal. This is why, at the moment, Transducers.jl’s combining functions are invoked in parallel (in master branch, with Julia >= 1.3). But it would be nice if there is a flexible way to tell Transducers.reduce to invoke certain functions in the main thread sequentially. I think this API can be generalized as a trait-based system to encode algebraic and algorithmic properties of reducing functions and transducers. This can be used to not only optimize the outer most loop (as in r/fold) but also all the inner lops (e.g., Cat() transducer).

Isn’t it much more concise to write

reduce(max, c; init=0)

?

Also, if the collection c can contain negative numbers, it’s very dangerous to “package” the initial value 0 with max. This is not an accident but based on a mathematical principle. Monoid is not the property of a function [1]. It is a property of a function paired with a set on which it is defined. You can’t know the identity element until you know the input type. For example, it is wrong to define *() = 1 because *() = "" also makes sense. Another example is intersect; you likely don’t want to implement intersect() = SET_OF_EVERYTHING. There are other concerns including the ones I described above.

Having said that, it is easy to implement Monoid helper type on top of InitialValues.jl. I agree it could be handy in some places. But this can be done because (I think) InitialValues.jl’s approach is more powerful than lisp’s nullary function approach.

[1] OK. In math, sometimes functions are “aware” of domain and codomain. I am using function in the sense that the function identity can be a function on Int and Float64.

By the way, I believe Transducers.jl has much nicer way to define calc-volume. Given a function calc_volume(p0, p1, p2, p3) -> volume, you can do

xf = Zip(MapSplat(calc_volume), Map(identity))
collect(xf, tet_indices)

or, if you want to do it in parallel, you can do

reduce(append!!, xf |> Map(Tuple), tet_indices; init=Union{}[])

I don’t think it does. The fork-join framework divides into a tree-like execution pattern, if I recall correctly. Reducing with append! happens with smaller collections to create the leaves and then cat takes advantage of clojures immutable data structures to combine everything up as the tree collapses to the root. You don’t know what order cat will combine collections, which is why the operation needs to be associative.

The point here is that I know there will be no negative numbers, so I can define my own application (set) specific monoid. The more concise version you provide has less information ( the specific monoid operating on my set ). My whole argument can be reduced to this: I think decomplecting the combining function and reducing function is more flexible.

Which is why passing your own combining function/Monoid as a parameter that doesn’t rely on dispatch is more flexible.

Thanks, that’s a nice way of doing it. You could do the same in clojure by using clojure’s transducers and then reducing with r/fold I believe.

OK. Thanks for the clarification. I was inferring that was the case from your earlier comment.

But my point is still valid. There may be the case where hinting that a function must run sequentially can be beneficial for performance.

append! is associative provided that you don’t use the input again (and input type matches).

Same applies to init=0. I can specify the concrete identity element because I know the collection does not include negative numbers.

I don’t think so. You only need to know what the identity is. Providing it as a parameter is as informative as calling unary function.

I agree with this. I think Transducers.jl can do better because transducers are ordinary structs in Transducers.jl; not opaque closures where you can’t query the internal properties (see the comment referring to a trait system above).

I don’t think the Monoid constructor approach is strictly more flexible because I can implement it on top of InitialValues.jl. In fact, I believe it’s the other way around. See my comment about intersect above.

Sorry I was starting to argue my point for the sake of argument. I haven’t really had any experience with Transducers.jl (yet). The breaking apart of combining function and reducing function is really in service to the fork-join framework, so it’s a little implementation specific.

How do you implement parallel reduce in Transducers.jl?

No worries! My apologies as I might sound a bit combative. It’s nice to get feedbacks from Clojure programmer’s perspective.

You don’t need to hurry if your main interest is parallelism because the parallel part is not really explored yet. The thread API in Julia is still just a few weeks old in Julia’s master branch. So Transducers.jl does not have much ATM.

Well, I did separate combine function from reducing function in the parallel reduce protocol too, as some transducers need to do something different. But currently, it’s not used much and not documented at all.

The surface API is something like (r/foldl ((comp (map inc) (filter even?)) +) coll) instead of (r/fold + (r/filter even? (r/map inc coll))) (but you can get something like the latter by using eduction).

Internally, I just use a simple divide-and-conquer approach until I hit a big enough base case. The core code is just 8 lines:

IIUC it is efficient because Julia does depth-first scheduling.

By the way, do reducers in Clojure support early termination in r/fold via reduced? In current master of Transducers.jl, using parallel reduce with a reducing function that may return a Reduced is possible but not deterministic. I think I’ve found a way to make it deterministic. I think it’d be useful for implementing parallel search and I wonder how/if it’s used in Closure.

Nice. I think this is essentially the same as fork-join.

Good question. I’m not sure. I haven’t mixed transducers and reducers in Clojure before.
If your reducing function is associative and deterministic your results should be deterministic, no? Seems like this is why clojure requires the reducing function to be associative. Which also implies a weak form of immutability ( restriction on mutating shared state between chunks ).

Yes, if you sweep all elements in the collection. But Reduced means that it’s OK to (or more like should) terminate the reduction. That’s beneficial because you can cut execution time. It is easy to implement it in (non-parallel) foldl because there is no ambiguity in the result (even without associativity). However, in parallel reduce, you can’t simply terminate at the “first” occurrence of the match (“first” in physical time). This is because it may not be the first element in the collection. So, it is incorrect to terminate all the task in parallel reduce just because Reduced is returned in one task; you only can terminate the ones “right” to your location in the collection.

I think what I want to demonstrate would be implemented as

(require '[clojure.core.reducers :as r])

(defn reduce-if [f]
  (fn
     ([y x]
        (cond
           (or (reduced? y) (f y)) (ensure-reduced y)
           (or (reduced? x) (f x)) (ensure-reduced x)
           :else y))
     ([] 0)))

in Clojure. But it behaves strangely:

user=> (let [op (reduce-if odd?)] (r/fold op [0 3 0 1 0]))
3
user=> (let [op (reduce-if odd?)] (r/fold op op [0 3 0 1 0]))
3
user=> (let [op (reduce-if odd?)] (r/fold 5 op op [0 3 0 1 0]))
3
user=> (let [op (reduce-if odd?)] (r/fold 4 op op [0 3 0 1 0]))
#object[clojure.lang.Reduced 0x2d23faef {:status :ready, :val 3}]

I think this indicates that r/foldl is unaware of reduced because it (presumably) does not handle reduced returned from combinef.

edit: Fixed reduce-if

1 Like

It looks like reduced renders the operation non-associative in clojure. Which seems reasonable.
If you can do manage to handle non-associative operators, that’s an improvement.

The fact that r/fold returns a Reduced is nothing to do with non-associativity. If it has the same semantics as clojure.core/reduce, it should unwrap the Reduced before returning it. I suspect that it unwraps Reduced when n is larger than the collection size because it uses clojure.core/reduce in the base case.

But you are right that the original reduce-if was not associative. I just fixed it. I think it is now equivalent to the First monoid in Haskell https://www.schoolofhaskell.com/user/mgsloan/monoids-tour#first-and-last

1 Like

Sure, seems like a bug in the clojure code. Easy enough to fix by wrapping your result with (unreduced r) though. Actually this was one of my frustrations with clojure, results of functions were often either wrapped, unwrapped or in a totally different container than I expected.

Should associativity affect the result? Possibly I don’t understand the subtleties of reduced and the implications of results being returned in order vs affects of non-associatively.
Do I understand reduced and associativity correctly? A + B + C is such that (A + B) + C = reduced(A+B) and A + (B + C) = reduced(B+C). Where X + reduced(Y) = reduced(Y). I’m not sure what the result of parallel reduce on A+B+C+D… should be.