Hi,
I wanted to ask about recommendations for packages that offer multithreaded mapreduce-style computation. That is unfortunately not part of stdlib, so I was wondering which packages are good for that.
The way I like to think about my problems is in terms like the (imo very good!) java stream collect API:
I have an iterator of T
items, I have a makeCollector()::Collector
, I have an accept(collector::Collector, item::T)::Collector
, and a function merge(left::Collector, right::Collector)::Collector
.
On a list e.g. [item1, item2, item3]
this should compute
foldcollectthing(makeCollector, [item1, item2, item3]) = accept(accept(accept(makeCollector(), item1), item2), item3)
somewhat foldl
style. Parallelism is enabled by associativity:
accept(left, item) == merge(left, accept(makeCollector(), item))
merge(merge(A, B), C) == merge(A, merge(B, C))
This allows the implementation of the desired foldcollectthing
to split the input into chunks, run on each chunk on a different thread, and then use merge
to combine the result.
Typically accept
and merge
are mutating, i.e. accept(collector, item) === collector
and merge(left, right) === left
, and merge
can destroy the right collector. If the amount of time taken for each item is known to be homogeneous, ideally makeCollector
should be called once per thread only.
A typical example how that kind view works is the following baby example:
julia> foo(n)=collect(1:n)
julia> mapreduce(foo, vcat, 1:3)
6-element Vector{Int64}:
1
1
2
1
2
Using this paradigm, this would be spelled:
julia> mutable struct Collector items::Vector{Int} end
julia> makeCollector() = Collector(Int[])
makeCollector (generic function with 1 method)
julia> function accept(collector::Collector, i)
for res in foo(i)
push!(collector, res)
end
collector
end
accept (generic function with 1 method)
julia> function merge(left::Collector, right::Collector)
if length(left.items) < length(right.items)
prepend!(right.items, left.items)
left.items = right.items
else
append!(left.items, right.items)
end
left
end
As a side-note, the if length(left.items) < length(right.items)
case is essential: It makes the difference between O(N log N) and O(N^2) runtime for collecting N items in the worst-case schedule; this kind of thing really needs a double-ended queue and cannot be done with e.g. C++ Vector.
=================
Any recommendations?
Am I too blind to see how to do this with Transducers.jl @tk3369 ? I.e. given some makeCollector
, merge
and accept
functions, how would I use foldxt
to do this job on e.g. a plain Vector
of items?
PS. Another way of writing the desired result would be
mapreduce(item -> accept(makeCollector(), item), merge, collection; init = makeCollector())
The essential part, however, is that accept
is possibly non-allocating and cheaper than merge
. I don’t want brittle compiler optimizations to possibly maybe elide such allocations, I want it guaranteed.