Iterating over an Iterator with map, for loop and threads: Memory usage and performance

Warning: In this original benchmarks there was a problem with my usage of @btime, as pointed by @mikmoore. See the corrected benchmarks (which already solve question Q1) in a reply below.


Hi everyone. I am playing around with the Iteration interface and could use some help figuring out what happens behind the scenes in some usage examples.

I will be using

using .Iterators
using BenchmarkTools

prod = product(repeated(1:5, 9)...);
f(x) = sum(x)

I am interested in understanding what are the trade-offs between performance and memory usage between different ways of iterating over prod, when prod would be too large to collect and keep in memory.

So, the standard way of iterating without collecting would be simply using Iterators.map:

@btime map(f, prod)
> 2.745 ms (5 allocations: 14.90 MiB)

Good enough for memory, for if I collect prod and look into it with varinfo, I get

  name           size summary                                     
  ––––––– ––––––––––– ––––––––––––––––––––––––––––––––––––––––––––
  prodcol 134.111 MiB 5Ă—5Ă—5Ă—5Ă—5Ă—5Ă—5Ă—5Ă—5 Array{NTuple{9, Int64}, 9}

However, when I iterate with a for loop, I get

@btime for x in prod
    f(x);
end

> 234.641 ms (5859375 allocations: 864.27 MiB)

But if I first collect, I get

@btime for x in collect(prod)
    f(x);
end

> 420.362 ms (5858868 allocations: 491.73 MiB)

(Q1) What is the reason for the difference in runtimes and memory usage between the two for loops? Does the first one not collect prod to be able to iterate over it? And why is the map version so much faster? Is it simply because it avoids collecting and allocating memory?

Second, since my real use case will involve an iterator which is very large, and for each element of this iterator an expensive computation needs to be done, I am looking into ways of combining something similar to map (or anything which does not collect nor requires indexing) together with multithreading.

Since Threads.@threads requires indexing, I tried instead:

using ThreadsX
@btime ThreadsX.map(f, prod);

> 119.817 ms (5865420 allocations: 485.55 MiB)

(Q2) As far as I could find out/guess, roughly, ThreadsX.map is not collecting prod, but rather it is partitioning the iterator into nthreads and then running serialized mapping on each thread (though I am not sure this is correct!). Then, why is the memory usage so high when compared to Iterators.map? And is the increased computation time just the overhead of distributing the threads? I.e., if f would be an expensive computation, then should I see an improvement with ThreadsX?

Lastly, I have noticed it is not possible to use Threads.@threads for p in prod since prod is not indexable. However, FLoops.@floop work:

using FLoops

@btime @floop for x in prod
    f(x);
end

> 40.263 ÎĽs (835 allocations: 54.41 KiB)

From the documentation, as long as the iterator supports the SplittablesBase.jl interface, this would work (so similar to ThreadsX.map), so fair enough.

(Q3) However, there is a drastic difference both in runtime and memory usage for FLoops.@floop. So, why is it so much faster than, say, ThreadsX.map and how come it uses even less memory than Iterators.map?

and

(Q4) Do you have any recommendations/rules of thumb to structure computations where prod is too big to be collect and f(x) is very expensive to compute and one has access to many cores on shared memory or a distributed memory cluster?

Your issue here is that you aren’t interpolating variables into @btime with $. This means that, when it runs the code, it’s benchmarking how it would run in the REPL (with all variables as globals, most of them untyped and non-constant) rather than how they would perform in a function. The reason map did not hit such a big loss is that it serves as a function barrier (it has to do the global lookup and dynamic dispatch once for prod, but then once inside it knows what’s going on).

julia> @btime map(f, $prod);
  2.087 ms (3 allocations: 14.90 MiB)

julia> @btime for x in $prod
           f(x); # this isn't saving its work, so it's more like `foreach` than `map`
       end
  1.042 ms (0 allocations: 0 bytes)

# etc

Start by fixing that part of your benchmarks, then you can move on to other questions about relative performance and pitfalls of different methods.

1 Like

Nice, this already solve many of the confusing results, thanks!

Here are the new benchmarks:

@btime map(f, $prod);
> 2.668 ms (3 allocations: 14.90 MiB)
@btime foreach(f, $prod);
> 1.217 ms (0 allocations: 0 bytes)
@btime for x in $prod
    f(x);
end
> 1.288 ms (0 allocations: 0 bytes)
@btime for x in collect($prod)
    f(x);
end
20.722 ms (3 allocations: 134.11 MiB)
using ThreadsX
@btime ThreadsX.map(f, $prod);
> 62.532 ms (3912298 allocations: 391.37 MiB)
using FLoops

@btime @floop for x in $prod
    f(x);
end
> 40.331 ÎĽs (835 allocations: 54.28 KiB)

So Q1 is answered, for Q2 it would still be nice to have confirmation of my guesses and Q3 and Q4 remain open.

Note that map is Base.map and not Iterators.map. Iterators.map is the lazy version of Base.map and does no work upfront (deferring it all to each time a result is accessed):

julia> @btime Iterators.map(f,$prod);
  2.500 ns (0 allocations: 0 bytes)

Also, your for loops do no actual work. The result of f isn’t saved anywhere and has no side effects, so it’s like it never even happened. With proper optimization, your loop could be optimized to for x in prod; end and then to nothing. I’m suspicious that’s your FLoops version recognized this and that’s how it got a 30x speedup over map. Or maybe it’s just really good at dividing the work and you have 30+ threads at your disposal. In any case, you should probably have the loop save the results somewhere to be sure this isn’t a risk.


I don’t have answers for you, but I’ll provide some initial commentary on your outstanding questions. Someone else can hopefully provide actual answers.

Q2: I don’t know much about ThreadsX.map except what i read from the first few paragraphs of its github page. It appears to be making two allocations per input value (plus some extra overhead), which makes me suspect that it’s creating a task for each iteration that it then runs on the available threads. This would mean that you’d need the work being done to dwarf the cost of creating the task in order to see benefit.

Q3: I know even less about FLoops. One wild guess is that it’s chunking the input so it launches fewer tasks, but someone else can probably give the real reason. Also, see my above remark about your loop being a no-op.

Q4: You should never ever collect anything unless you need a mutable copy of it or it’s absolutely required for some interface (very rare). One challenge of something “too big to collect” is that the result may also be too big to collect, although a reduction may still be possible (a la mapreduce). Those comments aside, I haven’t done distributed computing with Julia (or anything else) so can’t speak to the preferred doctrine.

2 Likes

Indeed, your guess about the @floop for seems correct:

xs = zeros(length(prod))
@btime @floop for (i,x) in enumerate($prod)
    @inbounds xs[i] = f(x);
end

> 13.478 ms (1953615 allocations: 29.93 MiB)

Regarding the possible overhead of ThreadsX over FLoops, since I am not smart enough to actually understand how they work, I ran the following tests:

function f(x)
    sleep(0.05)
    return sum(x)
end

prod = product(repeated(1:5, 5)...);

xs = zeros(length(prod))
@btime @floop for (i,x) in enumerate($prod)
    @inbounds xs[i] = f(x);
end
# 12.873 s (19339 allocations: 629.66 KiB)

@btime xs = ThreadsX.map(f, $prod);
# 1.293 s (45371 allocations: 1.97 MiB)

It caught my attention that the ThreadsX version is significantly faster than the total sleep time (prod has 3125 elements so 3125 * 0.05 ~ 13seconds and I am using 12 threads). The time is also highly dependent on the variable basesize, mentioned in the README.md of the repository. So it seems that ThreadsX does well with IO bound operations.

For computationally intensive ones, both options become similar:

function f(x)
    sum(sum(sum((randn((1_000, 1_0000)) for _ in 1:20))))
end

xs = zeros(Float64, 24)
@btime @floop for i in 1:24
    @inbounds xs[i] = f(0);
end
# 24.209 s (1998 allocations: 69.74 GiB)


@btime ys = ThreadsX.map(f, (0.05 for i in 1:24));
# 27.952 s (2095 allocations: 69.74 GiB)

Anyway, I have learned enough for my current use cases, so I am marking the question as solved. If nevertheless someone would provide more details about the differences between ThreadsX and FLoops, I would be interested. Thanks!

1 Like