Threads over iterator products

It there a way to use @threads over iterator products? I am trying to convert some nested loops like:

julia> r = zeros(Threads.nthreads())
       Threads.@threads for i in 0:1
         for j in 0:1
           r[Threads.threadid()] += 1
       r = sum(r)

into this:

julia> r = zeros(Threads.nthreads())
       Threads.@threads for (i,j) in Iterators.product(0:1,0:1)
         r[Threads.threadid()] += 1
       r = sum(r)
ERROR: TaskFailedException...

Some context: In some cases of my problem threading on the outer loop is fine, because the there are many cycles. In other cases, the number of cycles of the outer loop is small, and then I would like to thread on the inner loop. Thus, if I could thread over the iterator product, I would cover both scenarios.

Any idea is appreciated, if what I am trying is not reasonable.

1 Like

I’ve never had much luck with getting any real speedup parallelizing over un-collected product iterators. If it were me and I just wanted to be done, I might write internal functions f_threadinner and f_threadouter and then just figure out some heuristic for checking sizes and then write f(args...) = should_thread_inner(args...) ? f_threadinner(args...) : f_threadouter(args...).

I’ll be very curious to see what other people suggest here, though, because I often just eat the cost of a vec(collect(Iterators.product(...))) and would love to stop doing that.

You can make your own macro using Threads.@spawn and Iterators.partition. This one was at some point based on this PR:

macro threads(ex)
    Meta.isexpr(ex, :for) || throw("expected a for loop")
    Meta.isexpr(ex.args[1], :(=)) || throw("can't handle nested outer loops")
    _threads(ex.args[1], ex.args[2])

function _threads(top, body)
    var, iter = top.args
    @gensym chunk
        $Base.@sync for $chunk in $Iterators.partition($iter, $cld($length($iter), $Threads.nthreads()))
            $Threads.@spawn begin
                $Base.@sync for $var in $chunk
    end |> esc

runs = zeros(Threads.nthreads())
@threads for (i,j) in Iterators.product(1:3, 1:7)
    runs[Threads.threadid()] += 1
runs # [6,6,6,3]

using BenchmarkTools

f1(A,B) = B .= exp.(log.(A))
f2(A,B) = for i in eachindex(A)
    B[i] = exp(log(A[i]))
f3(A,B) = Threads.@threads for i in eachindex(A)
    B[i] = exp(log(A[i]))
f4(A,B) = @threads for i in eachindex(A)
    B[i] = exp(log(A[i]))

A = rand(10^6); B = similar(A);
@btime f1($A,$B);
@btime f2($A,$B);
@btime f3($A,$B);
@btime f4($A,$B); # similar

If you only need “powers” of 1:n, then this is a very simple solution:

julia> r = zeros(Int, 2,2)
       Threads.@threads for i in CartesianIndices((2,2))
           r[i] = Threads.threadid()
2×2 Matrix{Int64}:
 1  3
 2  4

Looking at julia/threadingconstructs.jl at 37c0b0632771764f881184d68ecc3df318cfb32b · JuliaLang/julia · GitHub, it seems what Iterators.product is missing is an implementation of firstindex and getindex. I imagine it would be possible to fill in these missing methods, but doing so at a level of quality suitable for Base might not be straightforward.

FLoops.jl and other JuliaFolds packages support Iterators.product based on SplittablesBase.jl API. As a consequence, Iterators.product can even be arbitrary nested inside of, Iterators.filter, Iterators.flatten, etc. FLoops also supports nested loop syntax.


@tkf, thanks, but that was too easy. Do you have an intermediate option with which at least I have the impression that I’m smart? When I was a small kid, my older brother let me win in various games, but I was infuriated if he didn’t do so without letting me know that he was not really playing in full.

For the records, I had just to change @threads for ... to @floop ThreadedEx() for ...

Seriously impressive. In the past I remember experiencing some overhead with using the tranducers ecosystem with complicated iterators like products, at least for my typical code patterns, and I thought I double checked that that was still the case last night when I posted on this thread. But I clearly made some silly mistake like not running with julia -t* or something, because this benchmark shows that FLoops performs equally well within error to a raw threaded double for loop on my machine. That is incredible. Here is my little benchmark which assembles a simple kernel matrix:

using Folds, FLoops, BenchmarkTools

const xx = 1:1024

function serial()
  out = map(x->exp(-abs(x[1]-x[2])), Iterators.product(xx, xx))

function folds() # this one does seem to have some more overhead than floops.
  out =>exp(-abs(x[1]-x[2])), Iterators.product(xx, xx), ThreadedEx())

function floops()
  out = zeros(length(xx), length(xx))
  @floop ThreadedEx() for (vj, xjxk) in enumerate(Iterators.product(xx,xx))
    out[vj] = exp(-abs(xjxk[1] - xjxk[2])) 

function raw_thread_for()
  out = zeros(length(xx), length(xx))
  Threads.@threads for k in 1:length(xx)
    for j in 1:length(xx)
      out[j,k] = exp(-abs(xx[j]-xx[k]))

# with julia -O3 -t4
@btime serial()         # 4.372 ms (2 alloc)
@btime folds()          # 3.395 ms (173 alloc)
@btime floops()         # 1.978 ms (131 alloc)
@btime raw_thread_for() # 2.096 ms (23 alloc)

How does @floops use SplittablesBase.jl if the number of threads is not a power of two?

Hello @tkf, and is it possible to make it work with a flattened iterator?

julia> @floop ThreadedEx() for (i,c) in Iterators.product(
         @show (i,c)
ERROR: ArgumentError: Cannot compute size for object of type Base.Iterators.Flatten{CartesianIndices{2, Tuple{UnitRange{Int64}, UnitRan

By the way, in this case for me it would be very simple to provide the size of the iterator, maybe it is the case to define a new iterator, or structure, that contains it? (Can floops work with a custom iterator with all properties, like length, defined?)


edit: actually in my specific case it is not unreasonable to collect the part that I am flattening. So that is the easiest solution (the product of that collected part with the other iterator is not “collectable”).

Thanks for sharing the benchmark! Good to know it works well.

I think the default executor currently oversubscribes tasks. But that’s not a hard-coded assumption in FLoops. For example, Distributed-based executor uses multi-way split. But yes, it’d be an interesting addition to the interface.

Oh, I didn’t realize that Iterators.flatten was not supported for the case like this. Supporting Iterators.flatten in general is rather tricky [^1] but something like your example sounds reasonable to support.

[^1]: esp., how to detect and handle the case like Iterators.flatten(, xs))