Parallel mutable reduction / mapreduce

Hi,
I wanted to ask about recommendations for packages that offer multithreaded mapreduce-style computation. That is unfortunately not part of stdlib, so I was wondering which packages are good for that.

The way I like to think about my problems is in terms like the (imo very good!) java stream collect API:

I have an iterator of T items, I have a makeCollector()::Collector, I have an accept(collector::Collector, item::T)::Collector, and a function merge(left::Collector, right::Collector)::Collector.

On a list e.g. [item1, item2, item3] this should compute

foldcollectthing(makeCollector, [item1, item2, item3]) = accept(accept(accept(makeCollector(), item1), item2), item3)

somewhat foldl style. Parallelism is enabled by associativity:

accept(left, item) == merge(left, accept(makeCollector(), item))
merge(merge(A, B), C) == merge(A, merge(B, C))

This allows the implementation of the desired foldcollectthing to split the input into chunks, run on each chunk on a different thread, and then use merge to combine the result.

Typically accept and merge are mutating, i.e. accept(collector, item) === collector and merge(left, right) === left, and merge can destroy the right collector. If the amount of time taken for each item is known to be homogeneous, ideally makeCollector should be called once per thread only.

A typical example how that kind view works is the following baby example:

julia> foo(n)=collect(1:n)
julia> mapreduce(foo, vcat, 1:3)
6-element Vector{Int64}:
 1
 1
 2
 1
 2

Using this paradigm, this would be spelled:

julia> mutable struct Collector items::Vector{Int} end

julia> makeCollector() = Collector(Int[])
makeCollector (generic function with 1 method)

julia> function accept(collector::Collector, i)
       for res in foo(i)
       push!(collector, res)
       end
       collector
       end
accept (generic function with 1 method)
julia> function merge(left::Collector, right::Collector)
       if length(left.items) < length(right.items)
       prepend!(right.items, left.items)
       left.items = right.items
       else
       append!(left.items, right.items)
       end
       left
       end

As a side-note, the if length(left.items) < length(right.items) case is essential: It makes the difference between O(N log N) and O(N^2) runtime for collecting N items in the worst-case schedule; this kind of thing really needs a double-ended queue and cannot be done with e.g. C++ Vector.

=================

Any recommendations?

Am I too blind to see how to do this with Transducers.jl @tk3369 ? I.e. given some makeCollector, merge and accept functions, how would I use foldxt to do this job on e.g. a plain Vector of items?

PS. Another way of writing the desired result would be

mapreduce(item -> accept(makeCollector(), item), merge, collection; init = makeCollector())

The essential part, however, is that accept is possibly non-allocating and cheaper than merge. I don’t want brittle compiler optimizations to possibly maybe elide such allocations, I want it guaranteed.

You probably want @tkf not me? :slightly_smiling_face:

2 Likes

Is there something wrong with just substituting ThreadsX.mapreduce for mapreduce and using push!(collector.items, res)? Sorry if I’m misunderstanding the requirements here.

using ThreadsX

mutable struct Collector
    items::Vector{Int}
end

makeCollector() = Collector(Int[])

function accept(collector::Collector, i)
    for res in foo(i)
        push!(collector.items, res)
    end
    collector
end

function merge(left::Collector, right::Collector)
    if length(left.items) < length(right.items)
        prepend!(right.items, left.items)
        left.items = right.items
    else
        append!(left.items, right.items)
    end
    left
end

foo(n)=collect(1:n)

ThreadsX.mapreduce(item -> accept(makeCollector(), item), merge, 1:5; init = makeCollector())

Oops, sorry and thank you! The question was for you, @tkf

Yes: The wrong thing is that any mapreduce style API will allocate length(input) many collectors – it will, in a loop call something like

current = merge(current, accept(makeCollector(), idx))

The point of the java stream mutable reduction interface is that this can be written as

current = accept(current, idx)

and no new Collector needs to be created and almost immediately discarded. Indeed, the number of makeCollector() calls scales only with the number of threads / tasks.

In this sense, this API is conceptually superior to mapreduce for settings where it is possible to optimize the case where a single item is merged from the right into a collector.

The implicit claims in my questions were:

  1. This is common – many settings permit a fastpath for handling a single element
  2. This is not inherently conceptually confusing – once you get used to it, it is no more brain-warping to express a mapreduce problem in terms of merge / accept as opposed to merge / map.

Both claims are backed by the astounding success and precedent of this part of the java stream API.

1 Like

I took another shot. This defines nonallocating NilCollector and SingletonCollector, and adds a counter property i to Collector so we can check that it is only instantiated some number of times smaller than length(xs) . In this case the number of data items length(xs) == 100 and Collector is instantiated 16 times on my computer with nthreads() == 12.

using Folds

mutable struct Collector
    items::Vector{Int}
    i::Int # count how many times this type is instantiated.
end
makeCollector() = Collector(Int[], 1)

accept!(col::Collector, item) = begin
    push!(col.items, item)
    col
end

# `!!` methods use the mutate-or-widen pattern: mutate if possible, otherwise create a new object that can hold the result.

accept!!(c::Collector, item) = accept!(c, item)

struct SingletonCollector
    item::Int
end

accept!!(c::SingletonCollector, item) = merge!!(c, SingletonCollector(item))

struct NilCollector end

accept!!(::NilCollector, item) = SingletonCollector(item)



function merge!(left::Collector, right::Collector)
    if length(left.items) < length(right.items)
        prepend!(right.items, left.items)
        left.items = right.items
    else
        append!(left.items, right.items)
    end
    left.i += right.i
    left
end

merge!!(::NilCollector, ::NilCollector) = NilCollector()
merge!!(left, right::NilCollector) = left
merge!!(left::NilCollector, right) = right
merge!!(left::SingletonCollector, right::SingletonCollector) = accept!!(accept!!(makeCollector(), left.item), right.item)
merge!!(left::Collector, right::SingletonCollector) = begin
    push!(left.items, right.item)
    left
end
merge!!(left::SingletonCollector, right::Collector) = begin
    insert!(right.items, firstindex(right.items), left.item)
    right
end
merge!!(left::Collector, right::Collector) = merge!(left, right)


Folds.mapreduce(SingletonCollector, merge!!, 1:100, ThreadedEx(); init=NilCollector())
1 Like

Imho, no Java API has ever been good as they tend to over-enigineer things in terms of types, i.e., it’s hard to see the trees for the forest.
Thus, here is my take using Transducers:

using Transducers
# sequential version
newCollector() = @show Int[]  # show to see how often it got called
accept(c::Vector, v) = push!(c, v)  # No need for a new type
foldl(accept, Map(x -> x^2), 1:100; init = OnInit(newCollector))

# possibly parallel version
singleton(x) = (x,)  # cheap non-allocating wrapper for singleton
function merge(cl::Vector, cr::Vector)
    if length(cl) < length(cr)
        prepend!(cr, cl)
    else
       append!(cl, cr)
    end
end
merge(c::Vector, x::Tuple) = push!(c, x[1])
# Could use a new type wrapping singletons, but a tuple does just fine
foldxt(merge, Map(x -> singleton(x^2)), 1:100; init = OnInit(newCollector))
3 Likes

This is nonsense. They do tend to over-engineer and many java APIs suck, but sometimes they hit an exceptionally good spot. The stream collector API is really really good.

Thanks! I like that.

A shortened form would be:

julia> using Transducers

julia> newCollector() = Int[]
newCollector (generic function with 1 method)

julia> merge(left::Vector, right::Vector) = length(left) < length(right) ? prepend!(right, left) : append!(left, right)
merge (generic function with 1 method)

julia> merge(left::Vector, item) = push!(left, item)
merge (generic function with 2 methods)

This uses some additional properties of foldxt in order to be type-stable. I do wonder whether that is guaranteed to work, or only works incidentially?

Apriori it would be valid for foldxt to evaluate this via e.g.

merge(input[1], merge(oninit.f(), input[2]))

This would crash and burn with the provided definitions.

In other words: This entire thing works and is type-stable with OnInit.f()::CollectorT, and merge(::CollectorT, ::CollectorT)::CollectorT and merge(::CollectorT, ::InputT)::CollectorT, which is exactly the API I asked for.

A naive reading would have suggested that merge(::InputT, ::InputT)::CollectorT and merge(::InputT, ::CollectorT)::CollectorT are also required, and that this is not type-stable / fully inferred unless InputT == CollectorT.

After reading a little more Transducers.jl code, it appears that it internally does use the sane API (java accept is called Transducers.next, java combine is called Transducers.combine).

Ok, fair enough. Had used some Java APIs previously, e.g., the technically very good Fork-Join pool. Nevertheless, the API always became nicer when used from Clojure (or Scala) as you could just pass a couple functions instead of implementing yet another interface.

Well, as it’s supposed to generalize foldl I would assume something like:

merge(merge(merge(oninit.f(), input[1]), input[2]),
      merge(merge(oninit.f(), input[3]), input[4]))

You are right though, that this does not seem to be explicitly documented.