Suppose I have an array of 5 coordinates each ranging from (1:5) so that myarr = [1:5 for i in 1 : 5] is of length 3125 (in reality this list can be much larger and each element could range over different values). I am trying to use the @threads macro to split the following loop into chunks
myarr = [1:5 for i in 1 : 5]
for (i,focal) in enumerate(Iterators.product((1:length(j) for j in myarr)...))
# Do stuff in here
end
When testing on vector, I was able to just split the array into roughly even chunks
myarr = zeros(3125)
Threads.@threads for k = 1 : Threads.nthreads()
for i in getrange(3125)
myarr[i] = Threads.threadid()
end
end
Where getrange is a function that returns the range that a particular thread will fill.
How (or) can I do the same thing on the cartesian product array?
Maybe I’m missing the issue, but a very simple answer would be to just call vec(collect(complicated_product_iterator)), and then whatever you were doing for a vector would again work just fine. You probably could get away with something almost as easy while still avoiding the collect, perhaps using Transducers.jl or something, but unless the storage or cost of calling collect are substantial compared to what you’re doing with those indices, you could probably just do it and be done.
Fair. For complicated iterators that you don’t want to collect but you do want to iterate on in a performant multithreaded way, I’d suggest looking into Transducers.jl. If you’d like more specific suggestions, it would help to have a code example that I or whoever else happens on this thread can run.
Transducers.jl and ThreadsX.jl seem promising. The below example is a rough outline of what I want to do
myarr = [1:5 for i in 1:5] # Generate for Iterators.Product example
### Start multi-threading to fill an array (a simple example) ###
numElements = prod(length(i) for i in myarr)
x = zeros(numElements)
# We will always know the number of elements in my actual problem!
# I want to split the below for loop among the available threads
for (i,cartProd) in enumerate(Iterators.product((1:length(j) for j in myarr)...))
x[i] = Threads.threadid() # This will actually be a function involving cartProd (which is why I need the cartProd as well as the index i)
end
That’s helpful, thank you. As you’ll probably note by now, I’m actually having a little trouble seeing what the issue is. Your code looks fine and runs just fine, and the iterator having known length is definitely great, as you point out. Is your issue that you aren’t getting the expected speedup?
No, it is just a basic for loop right now, so it is not working yet. There is no parallelization… The Threads.threadid() will be the same for all elements in the array.
EDIT: Yes the code does run, but as mentioned there is no parallelization. Unlike in a simple for loop where the threads macro can be called and the array split into chunks of roughly equal sizes, iterators.product does not allow the same syntax. So I cannot, at least at the moment, split the filling of the array among the threads…
Oof, I apologize. I have this bad habit of seeing something and thinking “oh, I can be helpful with that”, but then ultimately reading carelessly enough that I miss something obvious.
In any case, having actually read that above code more carefully, here’s an example analog that does this with multiple threads:
using ThreadPools
myarr = [1:5 for i in 1:5] # Generate for Iterators.Product example
### Start multi-threading to fill an array (a simple example) ###
numElements = prod(length(i) for i in myarr)
const x = zeros(numElements)
# I want to split the below for loop among the available threads
iter = enumerate(Iterators.product((1:length(j) for j in myarr)...))
ThreadPools.tforeach(iter) do xi
(i, cartProd) = xi # de-structure x, this is probably possible in the above line.
x[i] = Threads.threadid()
end
This uses ThreadPools.jl, which is yet another option, which I picked because it provides a threaded foreach that will look the most like your loop. Even though we didn’t split the thing in half by length manually and didn’t collect, you can look at how many entries of x take the value of each threadid and see that ThreadPools does that kind of work for you. Like I say in the comments, there’s probably a way to destructure xi in the same line as the do block, but hopefully the point is now clear. This answers your question, right? Sorry again for earlier.
@tkf Looking over the doc, it mentions that parallelization of the loop requires the added macro @reduce. But you’re saying @floop can be used to fill in the array as above? I checked both docs and do not see any mention of using it without a reduction…
Ah, thanks, good point. I should mention it more clearly in the documentation. You can use it with @floop ThreadedEx() for ...:
julia> using FLoops
julia> function floop_map!(f, ys, xs, ex = ThreadedEx())
@floop ex for i in eachindex(ys, xs)
@inbounds ys[i] = f(xs[i])
end
end
floop_map! (generic function with 2 methods)
julia> xs = randn(1_000_000)
ys = similar(xs);
julia> using BenchmarkTools
julia> @btime floop_map!(sin, ys, xs)
3.071 ms (23 allocations: 2.42 KiB)
julia> @btime floop_map!(sin, ys, xs, SequentialEx())
13.456 ms (4 allocations: 96 bytes)
julia> Threads.nthreads()
4
As you can see, ex can be set to anything (at compile-time). So, passing ex = ThreadedEx() to @floop ex for gives you multi-threading and ex = SequentialEx() brings back a sequential loop.
This is very helpful, thank you! I tried replacing the floor_map! function with Threads.threadid() to see which thread is filling which part of the array but it returned all 1.0s (naturally there also appeared to be no speedup with this particular f).
function floop_map!(f, ys, xs, ex = ThreadedEx())
@floop ex for i in eachindex(ys, xs)
@inbounds ys[i] = f
end
end
and calling it with floop_map(Threads.threadid(),ys,xs) as you did with the example for mapping sin.
Is the multi-threading occurring in some other way then I would expect from the @threads macro?
EDIT: In hindsight, I expect the lack of speedup is that Julia is doing something clever at compile time and realizing that it can fill the whole vector with the same element, which is why there is no speed up with parallelization, but this still does not answer why the threadid was all 1.0s and not distributed equally from 1- 4.
A better example of how I would need to use it @tkf
function testFLoops(ex=ThreadedEx())
myarr = [1:10 for i in 1:5] # Generate for Iterators.Product example
### Start multi-threading to fill an array (a simple example) ###
numElements = prod(length(i) for i in myarr)
x = zeros(numElements)
# I will have a cartesian product that will be used to fill an array
iter = enumerate(Iterators.product((1:length(j) for j in myarr)...))
@floop ex for (i,focal) in iter
x[i] = rand() # Would be some f(focal)
end
return x
end
Does not run, the error is ERROR: LoadError: TaskFailedException: ArgumentError: halve(zip(…))requires collections with identical "size"
I think your loop body might be just too fast for Julia to decide to spawn a new task in another thread. You can try something like
julia> let xs = zeros(8)
@floop ThreadedEx() for i in eachindex(xs)
t = time_ns() + 100_000_000
while t > time_ns()
end
xs[i] = Threads.threadid()
end
xs
end
8-element Array{Float64,1}:
1.0
1.0
4.0
4.0
2.0
2.0
3.0
3.0
julia> Threads.nthreads()
4
So the following function permits the use of all available threads (I have 4). However, the benchmark seems to show that the sequential and threaded are both the same speed
function testFLoops(n,ex=ThreadedEx())
myarr = [1:n for i in 1:5] # Generate for Iterators.Product example
### Start multi-threading to fill an array (a simple example) ###
numElements = prod(length(i) for i in myarr)
x = zeros(numElements)
# I will have a cartesian product that will be used to fill an array
iter = enumerate(Iterators.product((1:length(j) for j in myarr)...))
@floop ex for (i,focal) in iter
x[i] = floor(rand() + Threads.threadid()) # Would be some f(focal)
end
return x
end
n = 40
@btime x = testFLoops(n)
@btime y = testFLoops(n,SequentialEx())
802.679 ms (291 allocations: 781.27 MiB)
824.371 ms (23 allocations: 781.25 MiB)
Julia> Threads.nthreads()
4
I should note that your original example with sin does work and give the expected speedup.