Multi-threading or multi-processing with iterators

How can one use multi-threading with general iterators.
For ranges and arrays it’s very easy:

using Threads
result=Atomic{Int}(0)
for i in 1:1000_000
    global result
    atomic_add!(i)
end
# result.value == 500000500000 now

However trying the same with any more complicated iterator fails:

using Threads
result=Atomic{Int}(0)
@threads for i in Iterators.countfrom(1,1000_000)
    global result
    atomic_add!(i)
end

Error thrown in threaded loop on thread 2: MethodError(f=typeof(Base.length)(), args=(Base.Iterators.Count{Int64}(start=1, step=1000000),), world=0x0000000000006410)
Error thrown in threaded loop on thread 3: MethodError(f=typeof(Base.length)(), args=(Base.Iterators.Count{Int64}(start=1, step=1000000),), world=0x0000000000006410)
Error thrown in threaded loop on thread 1: MethodError(f=typeof(Base.length)(), args=(Base.Iterators.Count{Int64}(start=1, step=1000000),), world=0x0000000000006410)
Error thrown in threaded loop on thread 0: MethodError(f=typeof(Base.length)(), args=(Base.Iterators.Count{Int64}(start=1, step=1000000),), world=0x0000000000006410

I know multi-threading is still experimental so there might not be an easy way to do it.
For my calculation I want to reduce over the permutations of an array so shared memory is not even necessary and I could use multi-processing:

using Distributed
@distributed (+) for i in 1:1000_000
    i
end

>> 500000500000
using Distributed
@everywhere using Combinatorics
#using other iterator because Iterators.Count doesn't support length
@distributed (+) for i in permutations([1,2,3,4,5,6]) 
    sum(i)
end

On worker 2:
MethodError: no method matching getindex(::Combinatorics.Permutations{Array{Int64,1}}, ::Int64)
#35 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.1/Distributed/src/macros.jl:275
#112 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.1/Distributed/src/process_messages.jl:269
run_work_thunk at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.1/Distributed/src/process_messages.jl:56
macro expansion at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.1/Distributed/src/process_messages.jl:269 [inlined]
#111 at ./task.jl:259

Stacktrace:
 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] wait(::Condition) at ./event.jl:46
 [4] wait(::Task) at ./task.jl:188
 [5] fetch at ./task.jl:202 [inlined]
 [6] iterate at ./generator.jl:47 [inlined]
 [7] collect(::Base.Generator{Array{Task,1},typeof(fetch)}) at ./array.jl:606
 [8] preduce(::Function, ::Function, ::Combinatorics.Permutations{Array{Int64,1}}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.1/Distributed/src/macros.jl:263
 [9] top-level scope at In[52]:1

It appears that the @distributed macro wants to get the loop elements with getindex calls, which aren’t defined for simple iterators.

Is there a way to make this work? The number of permutations I get are quite big and it is absolutely not necessary to have them collected in an array. It still would be nice to get some parralelization.

I assume this is not your actual code but just note that atomic variables are not meant for computation. This code is going to be much slower than a normal sum loop.

Yes correct, I just used atomic in a quick attempt to sum numbers in a parallel way.
My actual code is somewhat like:

using Combinatorics
M = #...some n x n Matrix
result = sum(innerfunc(M,perm) for perm in permutations(1:n))

where innerfunc indexes the rows of M with the permuted indexes perm.
So ideally I would not be using shared memory at all.

Often I find that the most efficient way in Julia to do multithreading is not to decorate the innermost loop with @threads, but to split the work into a number of roughly equally sized chunks, one per CPU core, and parallelize that.

If you have a non-trivial iterator, that will be a bit tricky to achieve. If you don’t have a ton of permutations, you could pre-generate them into an array and index into that. For this specific problem however, it’s not very difficult to find the permutation for a given index, so what we could do is let each thread find a suitable start permutation for the iterator, and then let permutations handle the iteration from there.

Some code, in case that didn’t make sense:

using Combinatorics

const PRINT_LOCK = Threads.SpinLock()

function safe_print(args...)
    lock(PRINT_LOCK) do
        Core.println(args...)
    end
end

function permutation_by_index(n, i)
    v = collect(1:n)
    [splice!(v, 1 + i % factorial(d) ÷ factorial(d-1)) for d = n:-1:1]
end

innerfunc(M, perm) = sum(M[perm])

function sum_of_permutations_threaded(n; T = Threads.nthreads())
    N = factorial(n)
    M = collect(1:n)
    thread_sums = zeros(Int, T)
    Threads.@threads for t = 1:T
        start_idx = N*(t-1)÷T
        count = N*t÷T - start_idx
        perm = permutation_by_index(n, start_idx)
        itr = permutations(1:n)
        thread_sum = 0
        for _ = 1:count
            safe_print("Thread ",t,": ",string(perm))
            thread_sum += innerfunc(M, perm)
            _, perm = iterate(itr, perm)
        end
        thread_sums[t] = thread_sum
    end
    return sum(thread_sums)
end

Sample usage, on an 8 core machine:

julia> sum_of_permutations_threaded(4)
Thread 1: [1, 2, 3, 4]
Thread 4: [2, 3, 4, 1]
Thread 1: [1, 2, 4, 3]
Thread 4: [2, 4, 1, 3]
Thread 1: [1, 3, 2, 4]
Thread 4: [2, 4, 3, 1]
Thread 3: [2, 1, 3, 4]
Thread 6: [3, 2, 4, 1]
Thread 3: [2, 1, 4, 3]
Thread 6: [3, 4, 1, 2]
Thread 3: [2, 3, 1, 4]
Thread 6: [3, 4, 2, 1]
Thread 7: [4, 1, 2, 3]
Thread 8: [4, 2, 3, 1]
Thread 5: [3, 1, 2, 4]
Thread 8: [4, 3, 1, 2]
Thread 5: [3, 1, 4, 2]
Thread 8: [4, 3, 2, 1]
Thread 5: [3, 2, 1, 4]
Thread 2: [1, 3, 4, 2]
Thread 7: [4, 1, 3, 2]
Thread 2: [1, 4, 2, 3]
Thread 7: [4, 2, 1, 3]
Thread 2: [1, 4, 3, 2]
240

For the trivial innerfunc above, multithreading doesn’t help much, but for more complex functions you should see a decent speedup with this technique.

(Just beware that if the chunks assigned to each thread don’t do about the same amount of work, performance won’t be optimal. There are ways around this, such as adapting the size of each chunk, or creating more chunks than the number of cores for automatic balancing.)

5 Likes