I’ve been struggling with performance issues with my code for weeks. I’ve now narrowed down the primary problem to what seems to be a threading bottleneck in the garbage collector. Sorry, if this has been discussed already.

The following MWE follows the structure of the code, basically subsetting a matrix and performing a calculation on that. Note, that in the MWE I take the sum but in reality I calculate a histogram which in turn is further processed.

``````const THRESHOLD = 0.1

function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
thresholded .= transpose(sum(levels .* weights, dims=1) .> THRESHOLD )
end

function inner(weights::Vector{Float64}, levels::Matrix{Float64})::Float64
thresholded = Vector{Bool}(undef, size(levels)[2])
ensemble_and_threshold!(thresholded, levels, weights)
s1 = 0.0
s2 = 0.0
np = length(weights)
for p in 1:np
m200p = levels[p, :]
p200p = levels[p, thresholded]

# In reality, the next step in the calculation is significantly more involved
# than suggested here.
s1 += sum(m200p)
s2 += sum(p200p)
end
return s2 / s1
end

function outer(count::Int, levels::Matrix{Float64})::Float64
best = Inf
for _ in 1:count
weights = rand(4)
r = inner(weights, levels)
best = min(best, r)
end
return best
end

function main(n_runs)
N = 100000
np = 4
levels = rand(np, N)

results = zeros(n_runs)
results[i] = outer(10000, levels)
end

# Use results so that it is not optimsed out
println(devnull, results)
end

main(1) # Run once for compilation

for n_runs in 1:4
@time main(n_runs)
end
``````

I need to do this ~1000 times and each iteration is independent of the others so it should be embarassingly parallel. However, there is a bottleneck that prevents it scaling beyond about 4-8 processors. This is the output from the MWE:

``````➜ julia --threads=4  micro.jl
20.640172 seconds (260.03 k allocations: 97.917 GiB, 14.99% gc time)
27.225064 seconds (520.03 k allocations: 195.826 GiB, 25.38% gc time)
31.511773 seconds (780.04 k allocations: 293.733 GiB, 30.25% gc time)
36.147297 seconds (1.04 M allocations: 391.653 GiB, 35.14% gc time)
``````

This was run on an M1 Mac with 4 performance cores. I see the same with Julia on Linux. There is a very slight improvement with 1.10.0-rc2.

I would expect the time to be the same for each run, but the runs are getting longer, nealy 2 twice as long for 4 cores and the relative amount of the time in the garbage collector more than doubles. This suggests that the gc is, at least partially, single threaded.

This problem should be far more scalable than it is. What can I do to improve it?

I would suggest to remove excessive allocations, if possible.
This line m200p = levels[p, :] allocates a new vector in every iteration. Try to use view imstead

2 Likes

Some quick tests (second run in all cases):

Original code here:

``````julia> @time main(1)
15.794551 seconds (260.06 k allocations: 97.913 GiB, 4.07% gc time)
``````

Adding `@views` in front of `function inner...`

``````julia> @time main(1)
14.663373 seconds (180.06 k allocations: 68.111 GiB, 3.42% gc time)
``````

Removing allocations from a inner function:

``````function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
for i in 1:size(levels)[2]
s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
thresholded[i] = s > THRESHOLD
end
#thresholded .= transpose( sum(levels .* weights, dims=1) .> THRESHOLD )
end
``````
``````julia> @time main(1)
8.650660 seconds (110.06 k allocations: 30.703 GiB, 2.95% gc time)
``````

Make `weights` a `rand(SVector{4})` (from `StaticArrays` - needs to change the input signatures to `AbstractVector`):

``````julia> @time main(1)
8.261985 seconds (100.06 k allocations: 30.697 GiB, 3.05% gc time)
``````

Allocate `thresholded` outside the inner loop:

``````@views function inner(
weights::AbstractVector{Float64}, levels::Matrix{Float64};
thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
...
thresholded = Vector{Bool}(undef, size(levels)[2])
for _ in 1:count
weights = rand(SVector{4})
r = inner(weights, levels; thresholded = thresholded)
``````

Gives:

``````julia> @time main(1)
7.906400 seconds (80.06 k allocations: 29.767 GiB, 2.95% gc time)
``````

The remaining allocations are that of `thresholded`, which is done one per loop, and some in this line:

``````    p200p = levels[p, thresholded]
``````

Which could be avoided, but that one would need to know if it is worth the trouble, depending on the true calculations that follow.

Finally, I changed the main function to make it easier to test the scalability:

``````using ChunkSplitters
N = 100000
np = 4
levels = rand(np, N)
results = zeros(n_runs)
for i in irange
results[i] = outer(10000, levels)
end
end
# Use results so that it is not optimsed out
println(devnull, results)
end
``````

the number of chunks (while <= number of threads) will be the number of threads used. With that, I get:

I have 8 cores here:

``````julia> @time main2(8; nchunks = 1)
76.328059 seconds (649.46 k allocations: 238.106 GiB, 3.98% gc time, 0.18% compilation time: 10% of which was recompilation)

julia> @time main2(8; nchunks = 4)
20.942653 seconds (640.09 k allocations: 238.107 GiB, 5.98% gc time)

julia> @time main2(8; nchunks = 8)
16.748673 seconds (640.09 k allocations: 238.100 GiB, 7.95% gc time)
``````

It doesn’t seem too bad. You may have to make the inner functions completely non-allocating to get better scaling, probably.

edit:

Some additional improvements. With the chunked threading, you can easily allocate only one `threshholder` per chunk:

``````function outer(count::Int, levels::Matrix{Float64};
thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
...
thresholded = Vector{Bool}(undef, size(levels)[2])
for i in irange
results[i] = outer(10000, levels; thresholded = thresholded)
``````

With that and removing the inner allocation of the slice given by thresholded (I’m unsure if that is actually practical in the real code), you get:

``````    s2 = 0.0
for i in eachindex(thresholded)
if thresholded[i]
s2 += m200p[i]
end
``````
``````julia> @time main2(8; nchunks = 1)
36.377214 seconds (84 allocations: 3.156 MiB)

julia> @time main2(8; nchunks = 8)
5.494856 seconds (98 allocations: 3.824 MiB)
``````

The final serial code is about 3x faster than the original

``````julia> @time main2(1; nchunks = 1)
4.512136 seconds (63 allocations: 3.153 MiB)
``````
code
``````using StaticArrays
using ChunkSplitters
const THRESHOLD = 0.1

function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::AbstractVector{Float64})
for i in 1:size(levels)[2]
s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
thresholded[i] = s > THRESHOLD
end
#thresholded .= transpose( sum(levels .* weights, dims=1) .> THRESHOLD )
end

@views function inner(
weights::AbstractVector{Float64}, levels::Matrix{Float64};
thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
ensemble_and_threshold!(thresholded, levels, weights)
s1 = 0.0
s2 = 0.0
np = length(weights)
for p in 1:np
m200p = levels[p, :]
#p200p = levels[p, thresholded]

# In reality, the next step in the calculation is significantly more involved
# than suggested here.
s1 += sum(m200p)
s2 = 0.0
for i in eachindex(thresholded)
if thresholded[i]
s2 += m200p[i]
end
end
end
return s2 / s1
end

function outer(count::Int, levels::Matrix{Float64};
thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
best = Inf
for _ in 1:count
weights = rand(SVector{4})
r = inner(weights, levels; thresholded = thresholded)
best = min(best, r)
end
return best
end

function main(n_runs)
N = 100000
np = 4
levels = rand(np, N)

results = zeros(n_runs)
results[i] = outer(10000, levels)
end

# Use results so that it is not optimsed out
println(devnull, results)
end

#main(1) # Run once for compilation
#
#for n_runs in 1:4
#  @time main(n_runs)
#end

N = 100000
np = 4
levels = rand(np, N)
results = zeros(n_runs)
thresholded = Vector{Bool}(undef, size(levels)[2])
for i in irange
results[i] = outer(10000, levels; thresholded = thresholded)
end
end
# Use results so that it is not optimsed out
println(devnull, results)
end
``````
11 Likes

To address this directly: Yes, the GC is single-threaded up to and including 1.9. As you’ve observed, this quickly becomes a bottleneck when using multithreading with code that allocates a lot. Some notion of GC multithreading is being introduced in 1.10 and expanded further in 1.11, but I’m not qualified to describe the improvements. You’ll find some discussions about this if you search this forum.

Beyond eliminating allocation as much as you can (see @lmiq’s answer above), the other solution is to replace multithreading with multiprocessing using Distributed.jl (or perhaps Dagger.jl). Since each process has its own GC, this completely removes the scaling bottleneck. There’s a small overhead for process communication, but your problem is embarrassingly parallel and takes several seconds or more even in @lmiq’s optimized version of your MWE, so that shouldn’t be anything to worry about.

2 Likes

Thank you for the detailed analysis and suggestions.

Your version of `ensemble_and_threshold!` makes the single biggested difference and it’s quite substantial. I was a bit dissappointed that my version performs so poorly. Can you explain where the allocations are occuring in my version?

Interestingly, distributed is actually worse than threaded. Why would that be be?

Using some of the improvement suggested by @lmiq, I tested with up to 25 threads or processes in a dedicated 50 CPU Linux cgroup (to avoid confounding issues of hyperthreading and inteference from other workloads) and plotted the results.

Hard to say without seeing the code you’re running. This has not been my experience. I’m by no means an expert here, but could always take a look at an MWE and point out if there’s anything I would have done differently.

• `levels .* weights` allocates a new array
• `s .> THRESHOLD` allocates a new array

There are various tricks for writing contractions like this in an allocation-free way, for example using MappedArrays.jl, but often it’s easiest and most readable to just write out the loop like @lmiq did (though I’d replace `1:size(levels)[2]` with `axes(levels, 2)` for a slightly more compact and generic iteration specification).

3 Likes

Your main memory bandwidth consumer are reads from the `levels` matrix (841e5 = 3.2 megabytes). If you have many processes who share the levels matrix by-value instead of by-shared-memory then you’re thrashing your L3 (which is shared between cores). So it very much depends on details on how you share the weights matrix. So you would need to post a runnable MWE for us to reproduce.

Secondly, I suggest you refactor your code here:

The problem with this example dummy code is that `s1 = sum(levels))` is a constant, i.e. independent of weights, and can be computed in `main` outside of all loops. Is this true for your real problem? If so, then do that!

I presume this is not the case. Then you must spend at least 10 minutes to come up with a dummy calculation that has the same (and no more!) optimizable structure than your true problem. Otherwise you will send us all on a very fruitless goose chase.

Another thing you must be careful of: Your example dummy calculation requires no random access to `thresholded`, i.e. it would be a valid strategy to never allocate `thresholded` at all, and instead compute `thresholded_i = ...`. Is this true of your real calculation? Your benchmark dummy must very carefully match your true calculation!

3 Likes

Why is it different for threads vs processes? Perhaps the OS does not invalidate the caches for threads? Is levels matrix shared in the threads case?

It is not the case. The real code is quite complicated with many levels of indirection. The MWE demonstrates what I suspected was a bottleneck.

Please don’t say that. The suggestions and insights that have been provided here have really helped me understand what is going on. I’m not expecting anyone to write my code for me.

See, another very helpful suggestion. I assumed that broadcast operations would be the most effecient but seems that is not case from the refactoring of the `ensemble_and_threshold!` function.

Here is the code that compares threads vs procs. I change the assignment to `main` depending on the benchmark I’m running.

``````using Distributed

@everywhere const THRESHOLD = 0.1

@everywhere function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
for i in 1:size(levels)[2]
s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
thresholded[i] = s > THRESHOLD
end
# thresholded .= transpose(sum(levels .* weights, dims=1) .> THRESHOLD )
end

@everywhere function inner(weights::Vector{Float64}, levels::Matrix{Float64})::Float64
thresholded = Vector{Bool}(undef, size(levels)[2])
ensemble_and_threshold!(thresholded, levels, weights)
s1 = 0.0
s2 = 0.0
np = length(weights)
for p in 1:np
m200p = @view levels[p, :]
p200p = levels[p, thresholded]
s1 += sum(m200p)
s2 += sum(p200p)
end
return s2 / s1
end

@everywhere function outer(count::Int, levels::Matrix{Float64})::Float64
best = Inf
for _ in 1:count
weights = rand(4)
r = inner(weights, levels)
best = min(best, r)
end
return best
end

function main_procs(n_runs)
@everywhere N = 500000
@everywhere np = 4
@everywhere levels = rand(np, N)
@everywhere n_iter = 10000

result_channel = RemoteChannel(()->Channel{Float64}(n_runs));

doit(_) = put!(result_channel, outer(n_iter, levels))
pmap(doit, 1:n_runs)

results = map(1:n_runs) do _
take!(result_channel)
end

# Use results so that it is not optimsed out
println(devnull, results)
end

N = 500000
np = 4
levels = rand(np, N)
n_iter = 10000

result_channel = RemoteChannel(()->Channel{Float64}(n_runs));
doit(_) = put!(result_channel, outer(n_iter, levels))

doit(i)
end

results = map(1:n_runs) do _
take!(result_channel)
end

# Use results so that it is not optimsed out
println(devnull, results)
end

main = main_procs

main(1) # Run once for compilation

for n_runs in vcat(collect(1:4),collect(5:5:25))
@show n_runs
@time main(n_runs)
end
``````

Here is SLURM script used to run it:

``````#!/bin/bash

#SBATCH --time=1-00:00:00
#SBATCH --mem=40G
#SBATCH --qos=bonus
#SBATCH --output=x-%A.out
#SBATCH --job-name=Neville

hostname

CMD="julia --procs=25 micro.jl"
echo \$CMD
srun --unbuffered \$CMD

echo exit code: \$?
``````

Hi @evan-wehi, I’ve been playing around a bit with this, but I have one question before I write up what I’ve found: When you write `@everywhere levels = rand(np, N)`, do you actually intend each worker to sample its own instance of `levels`? Because that’s what `@everywhere` accomplishes, and it’s different from what you’re doing in the multithreaded case, where all threads share the same `levels`. That said, these independent instances of `levels` are not actually used in the rest of your code—they are overwritten by the instance of `levels` captured in the `doit` closure, which is the one sampled on the main process. So in the end, your multiprocessing and multithreaded codes accomplish the same thing, however, the multiprocessing code does a lot of unnecessary sampling work up front that you could have eliminated by removing all occurrences of `@everywhere` within `main_procs`.

So to clarify, should each parallel thread/process share the same `levels` or use independently sampled instances?

Here's an MWE illustrating what I'm talking about above
``````julia> using Distributed

julia> function f()
@everywhere x = rand()             # Samples x independently on each worker
printlocal(_) = println(@eval(x))  # Prints the worker's own instance of x
printmain(_) = println(x)          # Copies x over from the main process and prints it
pmap(printlocal, 1:3)
println()
pmap(printmain, 1:3)
println()
pmap(printlocal, 1:3)              # Note how all workers now have the same x due to printmain
return nothing
end;

julia> f()
From worker 3:	0.5299608462035816
From worker 4:	0.3448114078538431
From worker 2:	0.7929854408997841

From worker 3:	0.017534970889111157
From worker 2:	0.017534970889111157
From worker 4:	0.017534970889111157

From worker 4:	0.017534970889111157
From worker 2:	0.017534970889111157
From worker 3:	0.017534970889111157
``````

`levels` should be the same everywhere and so could be in shared memory. In the real code this is data read from disk. It’s about the same size as I used in the MWE so the overhead of copying it to each address space is relatively small. Perhaps, it could be kept in cache for each process, it that’s possible.

FWIW, I’d do something like the following for a nice 40x speedup singlethreaded over @lmiq 's code.

However, I am pretty sure that this is not what you want – this makes too much use of structure where you said `this is a standin for a larger computation`.

Below code used the following:

1. s1 and therefore m200p are entirely loop independent, so we can pull it out of everything
2. Walking over the big levels thing for each `weights` has bad compute / memory density. So we do that for `Chunks=8` different weigths at the same time. We can profitably to that (and simd!) because your kernel is branch-free (I use a matrix-vector multiply to compute multiple dot-products at the same time).

Now it is unknowable to us whether this is useful for you, or a complete waste of time. Because we don’t have a good model for your computation.

Really sit down after you propose a model, and try for 15 minutes to “break it”. If somebody here proposes an optimization that is not applicable to your real problem, but is applicable to your model, then this is a learning opportunity for you – how to ask better questions / write better models to stand-in for your real problem. Saying “this is bad” is not enough, you need to fix the model.

``````julia> using LinearAlgebra,StaticArrays,Random
julia> function inner(
weights::SMatrix{Chunks, NLevels, T, S}, levels::Vector{SVector{NLevels,T}}, threshold) where {Chunks,NLevels,T,S}
s = zeros(SVector{Chunks,T})
for v in levels
thresholded = (weights*v) .> threshold
s += thresholded * v[NLevels]
end
s
end
julia> function outer(count::Int, levels::Vector{SVector{NLevels,T}}, threshold, ::Val{Chunks}) where {NLevels,T,Chunks}
best = T(Inf)
for _ in 1:count
weights = rand(SMatrix{Chunks,NLevels,T,Chunks*NLevels})
r2 = inner(weights, levels, threshold)
best = min(best, minimum(r2))
end
return best
end
julia> function main(n_runs)
np = 4
N = 100_000
count = 10_000
threshold = 0.1
levels = rand(np,N)
levels2 = ccall(:jl_reshape_array, Array{SVector{4,Float64},1}, (Any, Any, Any), Array{SVector{4,Float64},1}, levels, (N,))
s = sum(levels)
results = zeros(n_runs)
for i in 1:n_runs
results[i] = outer(Int(count/8), levels2, threshold, Val(8)) /  s
end
results
end
julia> @time main(1)
0.289770 seconds (4 allocations: 3.052 MiB)
1-element Vector{Float64}:
0.0
``````
1 Like

Given that `levels` should be shared, I created a slightly sanitized version of your code, removing the unnecessary `@everywhere`s. Also, in the interest of time when iterating on the code, I measure at `n_iter = 1000` and compile at `n_iter = 1`, and I rewrote the `main` invocation such that I can control threads vs. procs using the Julia launch configuration. Finally, I’m calling `GC.gc()` before every timing such that they always start from a blank slate (though this probably doesn’t matter at these time scales). This version is my baseline for further improvements. EDIT: I also removed the `RemoteChannel`, as it wasn’t necessary.

santized.jl
``````using Distributed

@everywhere const THRESHOLD = 0.1

@everywhere function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
for i in 1:size(levels)[2]
s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
thresholded[i] = s > THRESHOLD
end
# thresholded .= transpose(sum(levels .* weights, dims=1) .> THRESHOLD )
end

@everywhere function inner(weights::Vector{Float64}, levels::Matrix{Float64})::Float64
thresholded = Vector{Bool}(undef, size(levels)[2])
ensemble_and_threshold!(thresholded, levels, weights)
s1 = 0.0
s2 = 0.0
np = length(weights)
for p in 1:np
m200p = @view levels[p, :]
p200p = levels[p, thresholded]
s1 += sum(m200p)
s2 += sum(p200p)
end
return s2 / s1
end

@everywhere function outer(count::Int, levels::Matrix{Float64})::Float64
best = Inf
for _ in 1:count
weights = rand(4)
r = inner(weights, levels)
best = min(best, r)
end
return best
end

function main_procs(n_runs, n_iter)
N = 500000
np = 4
levels = rand(np, N)

doit(_) = outer(n_iter, levels)
results = pmap(doit, 1:n_runs)

# Use results so that it is not optimised out
println(devnull, results[end])
end

N = 500000
np = 4
levels = rand(np, N)

results = Vector{Float64}(undef, n_runs)
results[i] = outer(n_iter, levels)
end

# Use results so that it is not optimised out
println(devnull, results[end])
end

function main(n_iter)
f, n_runs_max = if nprocs() == 1
main_procs, nworkers()
else
error("Combined multithreading and multiprocessing not supported")
end
f(n_runs_max, 1)  # Compile on all workers
for n_runs in Iterators.flatten((1:4, 5:5:25))
(n_runs > n_runs_max) && break
@everywhere GC.gc()
@show n_runs
@time f(n_runs, n_iter)
end
end

main(1000)
``````

Here are the timings I get on the machine I’m using:

sanitized.jl timings
``````danielwe@mlhpc2:~/tmp\$ julia --threads=25 sanitized.jl
n_runs = 1
18.176020 seconds (11.16 k allocations: 15.348 GiB, 3.16% gc time)
n_runs = 2
25.129455 seconds (22.14 k allocations: 30.695 GiB, 4.06% gc time)
n_runs = 3
25.221908 seconds (33.15 k allocations: 46.042 GiB, 5.13% gc time)
n_runs = 4
27.767858 seconds (44.14 k allocations: 61.414 GiB, 4.53% gc time)
n_runs = 5
30.456231 seconds (55.15 k allocations: 76.746 GiB, 5.11% gc time)
n_runs = 10
43.307271 seconds (110.14 k allocations: 153.480 GiB, 4.83% gc time)
n_runs = 15
51.821543 seconds (165.16 k allocations: 230.176 GiB, 5.15% gc time)
n_runs = 20
60.507017 seconds (220.15 k allocations: 306.965 GiB, 4.81% gc time)
n_runs = 25
63.995065 seconds (275.16 k allocations: 383.647 GiB, 4.92% gc time)
danielwe@mlhpc2:~/tmp\$ julia --procs=25 sanitized.jl
n_runs = 1
16.986917 seconds (265 allocations: 15.279 MiB)
n_runs = 2
19.430964 seconds (323 allocations: 15.281 MiB)
n_runs = 3
21.465102 seconds (791 allocations: 15.297 MiB)
n_runs = 4
17.095496 seconds (822 allocations: 15.299 MiB)
n_runs = 5
20.871957 seconds (521 allocations: 15.289 MiB)
n_runs = 10
31.219670 seconds (821 allocations: 15.302 MiB)
n_runs = 15
48.659917 seconds (1.14 k allocations: 15.314 MiB)
n_runs = 20
67.285127 seconds (1.44 k allocations: 15.324 MiB)
n_runs = 25
75.453670 seconds (1.75 k allocations: 15.334 MiB)
``````

As you can see, multithreading and multiprocessing scale about the same for me. The thing that stands out is the ~5 % gc time. That’s a lot of performance left on the table. We need to fix that whether we’re parallelizing or not.

### General improvements

I’m trying to respect your structure as much as possible, so hopefully, most of these improvements translate to your real code.

1. Remove allocations as much as possible
• Allocate `weights` outside the outer loop and use `rand!(weights)` instead of `weights = rand(4)`.
• Allocate `thresholded` (or something equivalent, see below) outside the outer loop and pass it as an argument to `inner!`.
• Avoid allocating `p200p`. This cannot be done with a view, since `thresholded` does not provide a fixed stride. However, we can avoid instantiating it altogether if we always pass `m200p` and `thresholded` together and operate on them in tandem. In your example, `sum(p200p)` can be replaced with
``````sum(
i -> ifelse(thresholded[i], m200p[i], zero(eltype(m200p))),
eachindex(thresholded, m200p),
)
``````
I’m using `ifelse` instead of `?` here to avoid a branch in the inner loop so that, hopefully, this will SIMD. More generally, if you’re looping over `p200p`, you can replace `for x in p200p` with
``````for i in eachindex(thresholded, m200p)
if thresholded[i]
x = m200p[i]
...
``````
I hope this pattern works for your actual use case. Not instantiating `p200p` will save you a lot of allocations.
2. Improve memory access patterns.
• You’re slicing along the second dimension in `m200p = @view levels[p, :]`. Julia arrays are column-major, so if you slice along the first dimension instead, your view will be backed by contiguous memory, which is always preferable. This suggests we should transpose `levels`.
3. Use BLAS.
• `ensemble_and_threshold!` is just a matrix-vector product followed by thresholding. Linear algebra primitives let us leverage optimized BLAS routines.
• BLAS won’t give us `thresholded` directly, it provides `ensemble = levels * weights` such that `thresholded[i] = ensemble[i] > THRESHOLD`. As long as we’re not accessing `thresholded[i]` too many times, we may as well avoid instantiating it and instead replace every occurrence of `thresholded[i]` with `ensemble[i] > THRESHOLD`. (If your real code accesses `thresholded[i]` much more than your MWE, it might be worth seeing if you can leverage Tullio.jl, LoopVectorization.jl, or similar to obtain `thresholded` from `levels` and `weights` in a single BLAS-like pass.)
• To avoid allocations we shouldn’t actually write `ensemble = levels * weights`, but rather `mul!(ensemble, levels, weights)`, and allocate `ensemble` outside the outer loop.

With these changes, the code looks as follows. (Note that I also removed type annotations for readability, and made everything generic with `THRESHOLD` implicitly determining the float type used throughout—these changes don’t matter for performance, but happen somewhat reflexively to me.)

revised.jl
``````using Distributed

@everywhere using LinearAlgebra
@everywhere using Random

# Use single-threaded BLAS sice we're parallelizing at a higher level

@everywhere const THRESHOLD = 0.1

@everywhere function inner!(ensemble, levels, weights)
T = eltype(levels)
s1 = zero(T)
s2 = zero(T)
mul!(ensemble, levels, weights)
for p in axes(levels, 2)
m200p = @view levels[:, p]
s1 += sum(m200p)
s2 += sum(
i -> ifelse(ensemble[i] > THRESHOLD, m200p[i], zero(eltype(m200p))),
eachindex(ensemble, m200p),
)
end
return s2 / s1
end

@everywhere function outer(count, levels)
T = eltype(levels)
best = typemax(T)
weights = Vector{T}(undef, size(levels, 2))
ensemble = Vector{T}(undef, size(levels, 1))
for _ in 1:count
rand!(weights)
r = inner!(ensemble, levels, weights)
best = min(best, r)
end
return best
end

function main_procs(n_runs, n_iter)
N = 500000
np = 4

T = typeof(THRESHOLD)
levels = rand(N, np)

doit = let n_iter=n_iter, levels=levels
# I reflexively write closures like this to make sure the captured variables
# aren't boxed; it probably doesn't matter in this case, however
doit(_) = outer(n_iter, levels)
end
results = pmap(doit, 1:n_runs)

# Use results so that it is not optimised out
print(devnull, results[end])
end

N = 500000
np = 4

T = typeof(THRESHOLD)
levels = rand(T, N, np)

results = Vector{T}(undef, n_runs)
results[i] = outer(n_iter, levels)
end

# Use results so that it is not optimised out
print(devnull, results[end])
end

function main(n_iter)
f, n_runs_max = if nprocs() == 1
main_procs, nworkers()
else
error("Combined multithreading and multiprocessing not supported")
end
f(n_runs_max, 1)  # Compile on all workers
for n_runs in Iterators.flatten((1:4, 5:5:25))
(n_runs > n_runs_max) && break
@everywhere GC.gc()
@show n_runs
@time f(n_runs, n_iter)
end
end

main(1000)
``````

Here’s what the timings look like. Notice most importantly that serial performance is improved by about 4x and there’s no longer any measurable GC time. Scaling is also improved in the multithreaded case: The slowdown from 1 to 25 threads is less than 2x instead of more than 3.5x. However, the multiprocessing performance drops off a cliff with more than 10 processes. This is probably related to cache utilization, as @foobar_lv2 talked about earlier: `levels` takes up 16 MB, so 25 copies of it require 400 MB, while this machine has a total 120 MB CPU cache (4 CPUs with 30 MB each).

revised.jl timings
``````danielwe@mlhpc2:~/tmp\$ julia --threads=25 revised.j
n_runs = 1
4.460417 seconds (160 allocations: 19.088 MiB)
n_runs = 2
4.448414 seconds (144 allocations: 22.903 MiB)
n_runs = 3
4.512793 seconds (152 allocations: 26.718 MiB)
n_runs = 4
4.721820 seconds (164 allocations: 30.533 MiB)
n_runs = 5
4.551193 seconds (157 allocations: 34.347 MiB)
n_runs = 10
5.664418 seconds (181 allocations: 53.422 MiB)
n_runs = 15
6.958389 seconds (197 allocations: 72.496 MiB)
n_runs = 20
7.912543 seconds (219 allocations: 91.570 MiB)
n_runs = 25
8.733596 seconds (235 allocations: 110.645 MiB)
danielwe@mlhpc2:~/tmp\$ julia --procs=25 revised.jl
n_runs = 1
4.339249 seconds (264 allocations: 15.279 MiB)
n_runs = 2
4.422059 seconds (325 allocations: 15.281 MiB)
n_runs = 3
4.474285 seconds (392 allocations: 15.284 MiB)
n_runs = 4
4.391475 seconds (446 allocations: 15.286 MiB)
n_runs = 5
4.705617 seconds (516 allocations: 15.289 MiB)
n_runs = 10
7.042835 seconds (819 allocations: 15.299 MiB)
n_runs = 15
10.150502 seconds (1.14 k allocations: 15.315 MiB)
n_runs = 20
16.250822 seconds (1.47 k allocations: 15.323 MiB)
n_runs = 25
16.513299 seconds (1.75 k allocations: 15.336 MiB)
``````

### Shared arrays

To work around the cache bottleneck, let’s use the `SharedArrays` standard library to share the `levels` array across processes. This is a minor modification of the above code: we allocate the array using `levels = SharedMatrix{T}(N, np)` and access the underlying `Matrix` as `sdata(levels)`. (The latter is not necessary for the code to work, as `SharedArray` supports the `AbstractArray` interface, but it’s required for maximum performance because it ensures that we hit optimized rather than fallback methods for `rand!` and `mul!`.)

The resulting code looks like this:

revised_shared.jl
``````using Distributed

@everywhere using LinearAlgebra
@everywhere using SharedArrays
@everywhere using Random

# Use single-threaded BLAS sice we're parallelizing at a higher level

@everywhere const THRESHOLD = 0.1

@everywhere function inner!(ensemble, levels, weights)
T = eltype(levels)
s1 = zero(T)
s2 = zero(T)
mul!(ensemble, levels, weights)
for p in axes(levels, 2)
m200p = @view levels[:, p]
s1 += sum(m200p)
s2 += sum(
i -> ifelse(ensemble[i] > THRESHOLD, m200p[i], zero(eltype(m200p))),
eachindex(ensemble, m200p),
)
end
return s2 / s1
end

@everywhere function outer(count, levels)
T = eltype(levels)
best = typemax(T)
weights = Vector{T}(undef, size(levels, 2))
ensemble = Vector{T}(undef, size(levels, 1))
for _ in 1:count
rand!(weights)
r = inner!(ensemble, levels, weights)
best = min(best, r)
end
return best
end

function main_procs(n_runs, n_iter)
N = 500000
np = 4

T = typeof(THRESHOLD)
levels = SharedMatrix{T}(N, np)
rand!(sdata(levels))

doit = let n_iter=n_iter, levels=levels
# I reflexively write closures like this to make sure the captured variables
# aren't boxed; it probably doesn't matter in this case, however
doit(_) = outer(n_iter, sdata(levels))
end
results = pmap(doit, 1:n_runs)

# Use results so that it is not optimised out
print(devnull, results[end])
end

N = 500000
np = 4

T = typeof(THRESHOLD)
levels = rand(T, N, np)

results = Vector{T}(undef, n_runs)
results[i] = outer(n_iter, levels)
end

# Use results so that it is not optimised out
print(devnull, results[end])
end

function main(n_iter)
f, n_runs_max = if nprocs() == 1
main_procs, nworkers()
else
error("Combined multithreading and multiprocessing not supported")
end
f(n_runs_max, 1)  # Compile on all workers
for n_runs in Iterators.flatten((1:4, 5:5:25))
(n_runs > n_runs_max) && break
@everywhere GC.gc()
@show n_runs
@time f(n_runs, n_iter)
end
end

main(1000)
``````

Here are the timings for multiprocessing (the multithreaded code is unchanged from above):

revised_shared.jl timings
``````danielwe@mlhpc2:~/tmp\$ julia --procs=25 revised_shared.jl
n_runs = 1
4.351229 seconds (2.16 k allocations: 94.383 KiB)
n_runs = 2
4.539514 seconds (2.24 k allocations: 97.930 KiB)
n_runs = 3
4.602810 seconds (2.33 k allocations: 105.820 KiB)
n_runs = 4
4.518517 seconds (2.42 k allocations: 110.492 KiB)
n_runs = 5
4.663221 seconds (2.50 k allocations: 115.227 KiB)
n_runs = 10
6.302061 seconds (2.94 k allocations: 137.305 KiB)
n_runs = 15
6.564405 seconds (3.85 k allocations: 176.664 KiB)
n_runs = 20
7.231386 seconds (3.80 k allocations: 186.539 KiB)
n_runs = 25
8.795986 seconds (4.23 k allocations: 209.492 KiB)
``````

These timings are almost identical to the multithreaded timings, accruing a slowdown of about 2x from 1 to 25 processes. Hence, `SharedArrays` worked for eliminating the performance drop-off due to outgrowing the CPU cache. It does not outperform multithreading, however, suggesting that the lack of perfect scaling at this point is not due to single-threaded GC. If you can get your actual code to a similar state where `@time` doesn’t report any GC time and multithreading performs as well as multiprocessing, there’s probably no reason to prefer multiprocessing.

I haven’t looked into how to get closer to perfect scaling from here, but my hunch is that the answer lies in thinking more about hardware resources and topology. Specifically, the machine I’m using has 4 CPUs with 10 physical cores each. Thus, if I’m using more than 10 threads or processes, `levels` is likely shared across multiple CPUs, so it can’t just live in a single CPU’s cache. I have no idea how hardware deals with this, but it sounds like it could be a bottleneck. The next thing I would try is to use ThreadPinning.jl to pin threads to cores and make as many copies of `levels` as there are CPUs, sharing each copy only between the threads pinned to the same CPU. Beyond that, I defer to people more knowledgeable than me.

There’s also the question of how much more you can optimize your serial code, depending on your actual problem, see @foobar_lv2’s post above.

Hope this helps!

5 Likes

Thermal throttling is a big one. If only one core/CPU is utilized, then you can clock much higher. If that’s the answer, then your scaling might already be perfect.

I’m running on an enterprise compute server. I haven’t checked, but I didn’t expect thermal throttling to be a concern on such hardware?

It’s not a concern so much as a very clever system optimization: One core is very busy, 39 are idle; so it is advantageous for firmware/kernel to reconfigure the system for this clearly single-threaded workload, making use of the extra thermal/energy headroom. Applies not just to crappy laptops with underpowered cooling, but also to bigger machines. That being said, no idea how your machine is set up.

This sucks for benchmarking / development, though. I once lucked into a weird laptop with engineering sample CPU and quite bespoke firmware. Not very performant, but the timings were absolutely razor sharp. Super small jitter, runs for 4.57 seconds, that’s 97.2 cycles per loop iteration, let’s crack open that agner fog and get counting. Very educational!

2 Likes

Thanks @danielwe and @foobar_lv2 this is really helpful. I can apply most of these ideas directly to my the real code and already have ~3 speedup just applying a couple of them. I also learned a lot of idiosyncratic Julia too

1 Like