Parallelizing over an iterable

Suppose we are working with an iterable with known finite length.

I have a function that does something like

function makeresult(n::Int)
    it = makeiterator(n)
    res = 0.0
    for x in it
        res += f(x)
    end
    return res
end

Each iteration of the loop is independent of the other, so this is an easy candidate for parallelization. If it was an indexable object like an array, I would use ChunkSplitters.jl to parallelize this and be done. I could of course use collect(it) and use that approach but the size of that collection would be very large, so this approach would not be sustainable.

To make things more concrete, just imagine the iterator (i for i in 1:100). How can I make this amenable to a parallel style reduction? I think what I am after is something that splices this into N iterators, but I can’t seem to find anything that does this.

Iterators.partition does not quite offer what I am after.

2 Likes

What comes to my mind first is a producer/consumer style solution with a shared Channel (doc page).

So I would propose a solution like:

function produce_from_iterator(it)
    function _inner(channel)
        for i in it
            put!(channel, i)
        end
        close(channel)
    end
    return _inner
end

function makeresult(n::Int, nworkers=Threads.nthreads())
    # make channel from iterator
    it = makeiterator(n)
    channel = Channel(produce_from_iterator(it))

    # define what each task does
    function task()
        res = 0.0
        try
            while true
                x = take!(channel)
                res += f(x)
            end
        catch
        end
        return res
    end

    # create and start tasks
    tasks = [Task(task) for _ in 1:nworkers]
    schedule.(tasks)
    res = mapreduce(fetch, +, tasks) # get and combine results
    return res
end

Note that this will only be worth it if the work done on each element takes a long time to amortize the synchronization overhead introduced by the Channel. See benchmark below

julia> using BenchmarkTools
julia> f(x) = log(sin(x)^2*tan(x)^2)
julia> makeiterator(n) = (i for i in 1:n)
julia> @btime makeresult(1_000_000) # 4 threads
  6.153 s (2999538 allocations: 45.77 MiB)
-5.544428536817374e6

julia> @btime mapreduce(f, +, makeiterator(1_000_000))
  39.459 ms (0 allocations: 0 bytes)
-1.3861071342043434e6

Also note that the result is different due to floating point inaccuracy in this case.

Perhaps it would be better to chunk the iterator more like so:

function produce_from_iterator(it, chunksize, T=Any)
    function _inner(channel)
        buffer = Vector{T}(undef, chunksize)
        at = 1
        for i in it
            buffer[at] = i
            if at == chunksize
                put!(channel, copy(buffer))
                at = 0
            end
            at += 1
        end
        put!(channel, resize!(buffer, at-1))
        close(channel)
    end
    return _inner
end

function makeresult(n::Int, chunksize, nworkers=Threads.nthreads())
    # make channel from iterator
    it = makeiterator(n)
    channel = Channel(produce_from_iterator(it, chunksize, Int))

    # define what each task does
    function task()
        res = 0.0
        try
            while true
                xs = take!(channel)
                res += mapreduce(f, +, xs)
            end
        catch
        end
        return res
    end

    # create and start tasks
    tasks = [Task(task) for _ in 1:nworkers]
    schedule.(tasks)
    res = mapreduce(fetch, +, tasks) # get and combine results
    return res
end

Results:

julia> @btime makeresult(1_000_000,1) # same as above but worse
  6.457 s (3999543 allocations: 106.81 MiB)
-5.544428536817374e6

julia> @btime makeresult(1_000_000,10)
  698.351 ms (1299543 allocations: 32.04 MiB)
-5.544428536817317e6

julia> @btime makeresult(1_000_000,100)
  121.202 ms (1029543 allocations: 24.11 MiB)
-5.544428536817373e6

EDIT: I realized that my complicated produce_from_iterator(it, chunksize) could have been written much simpler using Iterators.partition

3 Likes

Maybe using the Atomic macro with Threads? Something like

mutable struct Res; @atomic res; end

function makeresult_parallel(n)
           it = 1:n
           res = Res(0.0)
           f(x) = x + 1
           Threads.@threads for x in it
               @atomic res.res += f(x)
           end
           return res.res
           end
1 Like

This does not work for generic iterators/generators because they are not indexable.

Also a easier version would be:

function makeresult_parallel(n)
    it = 1:n
    res = Threads.Atomic{Float64}(0.0)
    f(x) = x + 1
    Threads.@threads for x in it
       Threads.atomic_add!(res, f(x))
    end
    return res[]
end
1 Like

Yes you are right!

I completely forgot about atomic_add!. That’s why I declared a custom struct :sweat_smile:

1 Like

The manual seems to discourage use of atomic and even says that atomic add is deprecated. Is there another way that doesn’t use that?

Maybe a lock, but if the task is fast that won’t paralellize well:

julia> function makeresult_parallel(n)
           it = 1:n
           res = 0.0
           f(x) = x + 1
           lk = ReentrantLock()
           Threads.@threads for x in it
               lock(lk) do 
                  res += f(x)
               end
           end
           return res
       end

Using ThreadsX.jl you could do ThreadsX.sum(i for i in 1:100), I believe, since generators are supported by SplittablesBase.jl. (Though this particular example is too cheap to parallelize efficiently.)

Doing an atomic add or a lock for every iteration is going to add quite a bit of synchronization overhead. Better for each thread to have its own thread-local accumulator, and then sum these at the end. ThreadsX does this for you, I expect.

3 Likes

Would the following do what you want (using sin for f)?

it = (i for i in 1:100)
sum(fetch, [Threads.@spawn sin(x) for x in it])

Or to use thread-local accumulators as suggested by @stevengj:

sum(fetch, [Threads.@spawn sum(sin, sub_it)
                for sub_it in Iterators.partition(it, 10)])
1 Like

I thought this was the usecase for FLoops ?

1 Like

Right. It’s like ThreadsX.jl, which I mentioned above — it works for any iterator supported by SplittablesBase.jl, including generators.

2 Likes

makes sense - both are written by tkf, it appears.

In my case each iteration is typically quite fast (1ms at most I’d guess?). I want to parallelize simply because I expect use cases where length(it) is on the order of 1 billion or more, so across 100+ cores I can at least effectively cut that down two orders of magnitude. The function f takes a few other inputs, including an iterator or vector that should be shared across loops (read-only).

I had not noticed that ThreadsX.jl might work here. I have used that successfully in the past elsewhere so I will give that a try today. I have not used FLoops.jl but that looks worth checking out as well.

It is possible that this also might work. I am not familiar with @spawn and fetch.

If each iteration takes about 1 ms I think you will be fine with a lock as well. That’s not too fast.

1 Like

Here is a better MWE

using Combinatorics

function MWEfun(f,V,n::Int=1)
	T = length(V)
	mulexp = multiexponents(T,n)
	res = 0.0
	for k in mulexp
		ktilde = cumsum(k)
		res += f(V.^ktilde)
	end
	return res
end
MWEfun(sum,1:100,3)

The first problem with the above solutions (ThreadsX / FLoops) is that the SplittablesBase.halve(mulexp) errors…

This seems fixable by implementing a halve method for Combinatorics.jl. Basically, you’re not going to be able to parallelize well unless there is some way for different threads to efficiently skip ahead to different chunks of the iteration.

1 Like

cc: @ajloza

@ajloza has implemented a nthcomb function which could be used to jump ahead in a combinations sequence. See GitHub - ajloza/NthComb.jl

Additionally, there is an issue and offer to add this function to the Combinatorics package. See function nthcomb to match nthperm · Issue #132 · JuliaMath/Combinatorics.jl · GitHub

I also wanted to parallelize a computation over a large combinations iterator, so I developed a split_combinations function based on nthcomb.

SplitCombinations
module SplitCombinations

export split_combinations, split_multiexponents

using Combinatorics: Combinations, MultiExponents
using NthComb
    

struct SubCombinations
    parent::Combinations
    start::Vector{Int}
    len::Int
end

Base.eltype(::Type{SubCombinations}) = eltype(Combinations)

Base.length(sc::SubCombinations) = sc.len

function Base.iterate(sc::SubCombinations, state = (copy(sc.start), 1))
    parent_state, i = state 
    i > length(sc) && return nothing
    parent_next = iterate(sc.parent, parent_state)
    parent_item, parent_state = parent_next
    state = (parent_state, i+1)
    return (parent_item, state)
end

function split_combinations(g::Base.Generator{Combinations}, num_batches::Integer)
    c = g.iter
    batch_size = cld(length(c), num_batches)

    sub_combinations = map(1:num_batches) do batch
        start = nthcomb(c.n, c.t, (batch-1) * batch_size)
        len = batch < num_batches ? batch_size : length(c) - (num_batches-1) * batch_size
        sc = SubCombinations(c, start, len)
        Base.Generator(g.f, sc)
    end

    return sub_combinations
end

function split_multiexponents(m::MultiExponents, num_batches::Integer)
    MultiExponents.(split_combinations(m.c, num_batches), m.nterms)
end

end

It is easily extendable to split_multiexponents, since MultiExponents is implemented with Combinations.

Here’s a parallel version of your MWE using pmap:

Parallel MWE
using Distributed

addprocs(20)

@everywhere using Combinatorics
@everywhere using SplitCombinations

function MWE(f, V, n::Int=1)
    T = length(V)
    mulexp = multiexponents(T, n)
    res = 0.0
    for k in mulexp
        ktilde = cumsum(k)
        res += f(V .^ ktilde)
    end
    return res
end

@everywhere function MWEparallel(f, V, n::Int=1)
    pool = CachingPool(workers())
    num_batches = length(pool)
    T = length(V)
    mulexp = multiexponents(T, n)
    args = (fill(f, num_batches), fill(V, num_batches), split_multiexponents(mulexp, num_batches))
    results = pmap(MWEinner, pool, args...)
    return reduce(+, results)
end

@everywhere function MWEinner(f, V, mulexp)
    res = 0.0
    for k in mulexp
        ktilde = cumsum(k)
        res += f(V .^ ktilde)
    end
    return res
end

f = sum
V = 1:100
n = 5

@time res_serial = MWE(f, V, n)
@time res_parallel = MWEparallel(f, V, n)

res_serial ≈ res_parallel

Please note that I haven’t fully tested the code, but it does work for my use case and it might point you in the right direction.

3 Likes

Thanks a ton! I think this will be enough to get me what I need. Too bad that it isn’t already in Combinatorics.jl

1 Like

Glad this has helped! My use case was for monte carlo sampling across a large combination space. Perhaps we can point to these multiple uses to enhance the case for adding this to Combinatorics.jl?

2 Likes