Multithreading on Cartesian products

Suppose I have an array of 5 coordinates each ranging from (1:5) so that myarr = [1:5 for i in 1 : 5] is of length 3125 (in reality this list can be much larger and each element could range over different values). I am trying to use the @threads macro to split the following loop into chunks

myarr = [1:5 for i in 1 : 5]
for (i,focal) in enumerate(Iterators.product((1:length(j) for j in myarr)...))
    # Do stuff in here
end

When testing on vector, I was able to just split the array into roughly even chunks

myarr = zeros(3125)
Threads.@threads for k = 1 : Threads.nthreads()
    for i in getrange(3125)
        myarr[i] = Threads.threadid()
    end
end

Where getrange is a function that returns the range that a particular thread will fill.
How (or) can I do the same thing on the cartesian product array?

Maybe I’m missing the issue, but a very simple answer would be to just call vec(collect(complicated_product_iterator)), and then whatever you were doing for a vector would again work just fine. You probably could get away with something almost as easy while still avoiding the collect, perhaps using Transducers.jl or something, but unless the storage or cost of calling collect are substantial compared to what you’re doing with those indices, you could probably just do it and be done.

I do not want to collect the whole array if I can avoid it. For the example, it is small but it could be on the order of millions to billions.

Fair. For complicated iterators that you don’t want to collect but you do want to iterate on in a performant multithreaded way, I’d suggest looking into Transducers.jl. If you’d like more specific suggestions, it would help to have a code example that I or whoever else happens on this thread can run.

Transducers.jl and ThreadsX.jl seem promising. The below example is a rough outline of what I want to do

myarr = [1:5 for i in 1:5]   # Generate for Iterators.Product example

### Start multi-threading to fill an array (a simple example) ###
numElements = prod(length(i) for i in myarr)
x = zeros(numElements)

# We will always know the number of elements in my actual problem!

# I want to split the below for loop among the available threads
for (i,cartProd) in enumerate(Iterators.product((1:length(j) for j in myarr)...))

    x[i] = Threads.threadid()   # This will actually be a function involving cartProd (which is why I need the cartProd as well as the index i)

end

That’s helpful, thank you. As you’ll probably note by now, I’m actually having a little trouble seeing what the issue is. Your code looks fine and runs just fine, and the iterator having known length is definitely great, as you point out. Is your issue that you aren’t getting the expected speedup?

No, it is just a basic for loop right now, so it is not working yet. There is no parallelization… The Threads.threadid() will be the same for all elements in the array.

EDIT: Yes the code does run, but as mentioned there is no parallelization. Unlike in a simple for loop where the threads macro can be called and the array split into chunks of roughly equal sizes, iterators.product does not allow the same syntax. So I cannot, at least at the moment, split the filling of the array among the threads…

Oof, I apologize. I have this bad habit of seeing something and thinking “oh, I can be helpful with that”, but then ultimately reading carelessly enough that I miss something obvious.

In any case, having actually read that above code more carefully, here’s an example analog that does this with multiple threads:

using ThreadPools

myarr = [1:5 for i in 1:5]   # Generate for Iterators.Product example

### Start multi-threading to fill an array (a simple example) ###
numElements = prod(length(i) for i in myarr)
const x = zeros(numElements)

# I want to split the below for loop among the available threads
iter = enumerate(Iterators.product((1:length(j) for j in myarr)...))
ThreadPools.tforeach(iter) do xi
  (i, cartProd) = xi # de-structure x, this is probably possible in the above line.
  x[i] = Threads.threadid()
end

This uses ThreadPools.jl, which is yet another option, which I picked because it provides a threaded foreach that will look the most like your loop. Even though we didn’t split the thing in half by length manually and didn’t collect, you can look at how many entries of x take the value of each threadid and see that ThreadPools does that kind of work for you. Like I say in the comments, there’s probably a way to destructure xi in the same line as the do block, but hopefully the point is now clear. This answers your question, right? Sorry again for earlier.

Perfect, this works, I’ll take a look at the docs on that. Thank you!!

1 Like

FYI, @floop for from FLoops.jl can be used for multi-threaded loop over Iterators.product (or any data structures that implement SplittablesBase.jl interface). That’s the same technology underlying Transducers.jl and ThreadsX.jl. See also FLoops.jl’s parallel loops tutorial and A quick introduction to data parallelism in Julia.

4 Likes

@tkf Looking over the doc, it mentions that parallelization of the loop requires the added macro @reduce. But you’re saying @floop can be used to fill in the array as above? I checked both docs and do not see any mention of using it without a reduction…

Ah, thanks, good point. I should mention it more clearly in the documentation. You can use it with @floop ThreadedEx() for ...:

julia> using FLoops

julia> function floop_map!(f, ys, xs, ex = ThreadedEx())
           @floop ex for i in eachindex(ys, xs)
               @inbounds ys[i] = f(xs[i])
           end
       end
floop_map! (generic function with 2 methods)

julia> xs = randn(1_000_000)
       ys = similar(xs);

julia> using BenchmarkTools

julia> @btime floop_map!(sin, ys, xs)
  3.071 ms (23 allocations: 2.42 KiB)

julia> @btime floop_map!(sin, ys, xs, SequentialEx())
  13.456 ms (4 allocations: 96 bytes)

julia> Threads.nthreads()
4

As you can see, ex can be set to anything (at compile-time). So, passing ex = ThreadedEx() to @floop ex for gives you multi-threading and ex = SequentialEx() brings back a sequential loop.

1 Like

This is very helpful, thank you! I tried replacing the floor_map! function with Threads.threadid() to see which thread is filling which part of the array but it returned all 1.0s (naturally there also appeared to be no speedup with this particular f).

function floop_map!(f, ys, xs, ex = ThreadedEx())
                  @floop ex for i in eachindex(ys, xs)
                      @inbounds ys[i] = f
                  end
              end

and calling it with floop_map(Threads.threadid(),ys,xs) as you did with the example for mapping sin.

Is the multi-threading occurring in some other way then I would expect from the @threads macro?

EDIT: In hindsight, I expect the lack of speedup is that Julia is doing something clever at compile time and realizing that it can fill the whole vector with the same element, which is why there is no speed up with parallelization, but this still does not answer why the threadid was all 1.0s and not distributed equally from 1- 4.

A better example of how I would need to use it @tkf

function testFLoops(ex=ThreadedEx())

	myarr = [1:10 for i in 1:5]   # Generate for Iterators.Product example

	### Start multi-threading to fill an array (a simple example) ###
	numElements = prod(length(i) for i in myarr)
	x = zeros(numElements)

	# I will have a cartesian product that will be used to fill an array
	iter = enumerate(Iterators.product((1:length(j) for j in myarr)...))

	@floop ex for (i,focal) in iter
		x[i] = rand() 	# Would be some f(focal)
	end

	return x

end

Does not run, the error is ERROR: LoadError: TaskFailedException: ArgumentError: halve(zip(…))requires collections with identical "size"

Thanks for the bug report. It’s fixed by Fix enumerate of product by tkf · Pull Request #63 · JuliaFolds/SplittablesBase.jl · GitHub. I tried to register the new version but it looks like Registrator is down now: Registrator bot unresponsive? · Issue #320 · JuliaRegistries/Registrator.jl · GitHub

1 Like

Ok. I will try it when it comes back online and get back to you.

I think your loop body might be just too fast for Julia to decide to spawn a new task in another thread. You can try something like

julia> let xs = zeros(8)
           @floop ThreadedEx() for i in eachindex(xs)
               t = time_ns() + 100_000_000
               while t > time_ns()
               end
               xs[i] = Threads.threadid()
           end
           xs
       end
8-element Array{Float64,1}:
 1.0
 1.0
 4.0
 4.0
 2.0
 2.0
 3.0
 3.0

julia> Threads.nthreads()
4

So the following function permits the use of all available threads (I have 4). However, the benchmark seems to show that the sequential and threaded are both the same speed

function testFLoops(n,ex=ThreadedEx())

	myarr = [1:n for i in 1:5]   # Generate for Iterators.Product example

	### Start multi-threading to fill an array (a simple example) ###
	numElements = prod(length(i) for i in myarr)
	x = zeros(numElements)

	# I will have a cartesian product that will be used to fill an array
	iter = enumerate(Iterators.product((1:length(j) for j in myarr)...))

	@floop ex for (i,focal) in iter
		x[i] = floor(rand() + Threads.threadid()) 	# Would be some f(focal)
	end

	return x

end

n = 40
@btime x = testFLoops(n)
@btime y = testFLoops(n,SequentialEx())

802.679 ms (291 allocations: 781.27 MiB)
824.371 ms (23 allocations: 781.25 MiB)

Julia> Threads.nthreads()
4

I should note that your original example with sin does work and give the expected speedup.

An aside - when filling a large array, it’s considerably faster to initialize it with Array{T}(undef, dims) if you don’t actually need zeros:

julia> @btime zeros(40^5);
  123.551 ms (2 allocations: 781.25 MiB)

julia> @btime Array{Float64}(undef, 40^5);
  8.600 ÎĽs (2 allocations: 781.25 MiB)

I guess that you will eventually pay almost the same amount a time when you actually initialize the undef Array