Given that levels
should be shared, I created a slightly sanitized version of your code, removing the unnecessary @everywhere
s. Also, in the interest of time when iterating on the code, I measure at n_iter = 1000
and compile at n_iter = 1
, and I rewrote the main
invocation such that I can control threads vs. procs using the Julia launch configuration. Finally, I’m calling GC.gc()
before every timing such that they always start from a blank slate (though this probably doesn’t matter at these time scales). This version is my baseline for further improvements. EDIT: I also removed the RemoteChannel
, as it wasn’t necessary.
santized.jl
using Distributed
@everywhere const THRESHOLD = 0.1
@everywhere function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
for i in 1:size(levels)[2]
s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
thresholded[i] = s > THRESHOLD
end
# thresholded .= transpose(sum(levels .* weights, dims=1) .> THRESHOLD )
end
@everywhere function inner(weights::Vector{Float64}, levels::Matrix{Float64})::Float64
thresholded = Vector{Bool}(undef, size(levels)[2])
ensemble_and_threshold!(thresholded, levels, weights)
s1 = 0.0
s2 = 0.0
np = length(weights)
for p in 1:np
m200p = @view levels[p, :]
p200p = levels[p, thresholded]
s1 += sum(m200p)
s2 += sum(p200p)
end
return s2 / s1
end
@everywhere function outer(count::Int, levels::Matrix{Float64})::Float64
best = Inf
for _ in 1:count
weights = rand(4)
r = inner(weights, levels)
best = min(best, r)
end
return best
end
function main_procs(n_runs, n_iter)
N = 500000
np = 4
levels = rand(np, N)
doit(_) = outer(n_iter, levels)
results = pmap(doit, 1:n_runs)
# Use results so that it is not optimised out
println(devnull, results[end])
end
function main_threads(n_runs, n_iter)
N = 500000
np = 4
levels = rand(np, N)
results = Vector{Float64}(undef, n_runs)
Threads.@threads for i in 1:n_runs
results[i] = outer(n_iter, levels)
end
# Use results so that it is not optimised out
println(devnull, results[end])
end
function main(n_iter)
f, n_runs_max = if nprocs() == 1
main_threads, Threads.nthreads()
elseif Threads.nthreads() == 1
main_procs, nworkers()
else
error("Combined multithreading and multiprocessing not supported")
end
f(n_runs_max, 1) # Compile on all workers
for n_runs in Iterators.flatten((1:4, 5:5:25))
(n_runs > n_runs_max) && break
@everywhere GC.gc()
@show n_runs
@time f(n_runs, n_iter)
end
end
main(1000)
Here are the timings I get on the machine I’m using:
sanitized.jl timings
danielwe@mlhpc2:~/tmp$ julia --threads=25 sanitized.jl
n_runs = 1
18.176020 seconds (11.16 k allocations: 15.348 GiB, 3.16% gc time)
n_runs = 2
25.129455 seconds (22.14 k allocations: 30.695 GiB, 4.06% gc time)
n_runs = 3
25.221908 seconds (33.15 k allocations: 46.042 GiB, 5.13% gc time)
n_runs = 4
27.767858 seconds (44.14 k allocations: 61.414 GiB, 4.53% gc time)
n_runs = 5
30.456231 seconds (55.15 k allocations: 76.746 GiB, 5.11% gc time)
n_runs = 10
43.307271 seconds (110.14 k allocations: 153.480 GiB, 4.83% gc time)
n_runs = 15
51.821543 seconds (165.16 k allocations: 230.176 GiB, 5.15% gc time)
n_runs = 20
60.507017 seconds (220.15 k allocations: 306.965 GiB, 4.81% gc time)
n_runs = 25
63.995065 seconds (275.16 k allocations: 383.647 GiB, 4.92% gc time)
danielwe@mlhpc2:~/tmp$ julia --procs=25 sanitized.jl
n_runs = 1
16.986917 seconds (265 allocations: 15.279 MiB)
n_runs = 2
19.430964 seconds (323 allocations: 15.281 MiB)
n_runs = 3
21.465102 seconds (791 allocations: 15.297 MiB)
n_runs = 4
17.095496 seconds (822 allocations: 15.299 MiB)
n_runs = 5
20.871957 seconds (521 allocations: 15.289 MiB)
n_runs = 10
31.219670 seconds (821 allocations: 15.302 MiB)
n_runs = 15
48.659917 seconds (1.14 k allocations: 15.314 MiB)
n_runs = 20
67.285127 seconds (1.44 k allocations: 15.324 MiB)
n_runs = 25
75.453670 seconds (1.75 k allocations: 15.334 MiB)
As you can see, multithreading and multiprocessing scale about the same for me. The thing that stands out is the ~5 % gc time. That’s a lot of performance left on the table. We need to fix that whether we’re parallelizing or not.
General improvements
I’m trying to respect your structure as much as possible, so hopefully, most of these improvements translate to your real code.
- Remove allocations as much as possible
- Allocate
weights
outside the outer loop and use rand!(weights)
instead of weights = rand(4)
.
- Allocate
thresholded
(or something equivalent, see below) outside the outer loop and pass it as an argument to inner!
.
- Avoid allocating
p200p
. This cannot be done with a view, since thresholded
does not provide a fixed stride. However, we can avoid instantiating it altogether if we always pass m200p
and thresholded
together and operate on them in tandem. In your example, sum(p200p)
can be replaced withsum(
i -> ifelse(thresholded[i], m200p[i], zero(eltype(m200p))),
eachindex(thresholded, m200p),
)
I’m using ifelse
instead of ?
here to avoid a branch in the inner loop so that, hopefully, this will SIMD. More generally, if you’re looping over p200p
, you can replace for x in p200p
withfor i in eachindex(thresholded, m200p)
if thresholded[i]
x = m200p[i]
...
I hope this pattern works for your actual use case. Not instantiating p200p
will save you a lot of allocations.
- Improve memory access patterns.
- You’re slicing along the second dimension in
m200p = @view levels[p, :]
. Julia arrays are column-major, so if you slice along the first dimension instead, your view will be backed by contiguous memory, which is always preferable. This suggests we should transpose levels
.
- Use BLAS.
ensemble_and_threshold!
is just a matrix-vector product followed by thresholding. Linear algebra primitives let us leverage optimized BLAS routines.
- BLAS won’t give us
thresholded
directly, it provides ensemble = levels * weights
such that thresholded[i] = ensemble[i] > THRESHOLD
. As long as we’re not accessing thresholded[i]
too many times, we may as well avoid instantiating it and instead replace every occurrence of thresholded[i]
with ensemble[i] > THRESHOLD
. (If your real code accesses thresholded[i]
much more than your MWE, it might be worth seeing if you can leverage Tullio.jl, LoopVectorization.jl, or similar to obtain thresholded
from levels
and weights
in a single BLAS-like pass.)
- To avoid allocations we shouldn’t actually write
ensemble = levels * weights
, but rather mul!(ensemble, levels, weights)
, and allocate ensemble
outside the outer loop.
With these changes, the code looks as follows. (Note that I also removed type annotations for readability, and made everything generic with THRESHOLD
implicitly determining the float type used throughout—these changes don’t matter for performance, but happen somewhat reflexively to me.)
revised.jl
using Distributed
@everywhere using LinearAlgebra
@everywhere using Random
# Use single-threaded BLAS sice we're parallelizing at a higher level
@everywhere BLAS.set_num_threads(1)
@everywhere const THRESHOLD = 0.1
@everywhere function inner!(ensemble, levels, weights)
T = eltype(levels)
s1 = zero(T)
s2 = zero(T)
mul!(ensemble, levels, weights)
for p in axes(levels, 2)
m200p = @view levels[:, p]
s1 += sum(m200p)
s2 += sum(
i -> ifelse(ensemble[i] > THRESHOLD, m200p[i], zero(eltype(m200p))),
eachindex(ensemble, m200p),
)
end
return s2 / s1
end
@everywhere function outer(count, levels)
T = eltype(levels)
best = typemax(T)
weights = Vector{T}(undef, size(levels, 2))
ensemble = Vector{T}(undef, size(levels, 1))
for _ in 1:count
rand!(weights)
r = inner!(ensemble, levels, weights)
best = min(best, r)
end
return best
end
function main_procs(n_runs, n_iter)
N = 500000
np = 4
T = typeof(THRESHOLD)
levels = rand(N, np)
doit = let n_iter=n_iter, levels=levels
# I reflexively write closures like this to make sure the captured variables
# aren't boxed; it probably doesn't matter in this case, however
doit(_) = outer(n_iter, levels)
end
results = pmap(doit, 1:n_runs)
# Use results so that it is not optimised out
print(devnull, results[end])
end
function main_threads(n_runs, n_iter)
N = 500000
np = 4
T = typeof(THRESHOLD)
levels = rand(T, N, np)
results = Vector{T}(undef, n_runs)
Threads.@threads for i in 1:n_runs
results[i] = outer(n_iter, levels)
end
# Use results so that it is not optimised out
print(devnull, results[end])
end
function main(n_iter)
f, n_runs_max = if nprocs() == 1
main_threads, Threads.nthreads()
elseif Threads.nthreads() == 1
main_procs, nworkers()
else
error("Combined multithreading and multiprocessing not supported")
end
f(n_runs_max, 1) # Compile on all workers
for n_runs in Iterators.flatten((1:4, 5:5:25))
(n_runs > n_runs_max) && break
@everywhere GC.gc()
@show n_runs
@time f(n_runs, n_iter)
end
end
main(1000)
Here’s what the timings look like. Notice most importantly that serial performance is improved by about 4x and there’s no longer any measurable GC time. Scaling is also improved in the multithreaded case: The slowdown from 1 to 25 threads is less than 2x instead of more than 3.5x. However, the multiprocessing performance drops off a cliff with more than 10 processes. This is probably related to cache utilization, as @foobar_lv2 talked about earlier: levels
takes up 16 MB, so 25 copies of it require 400 MB, while this machine has a total 120 MB CPU cache (4 CPUs with 30 MB each).
revised.jl timings
danielwe@mlhpc2:~/tmp$ julia --threads=25 revised.j
n_runs = 1
4.460417 seconds (160 allocations: 19.088 MiB)
n_runs = 2
4.448414 seconds (144 allocations: 22.903 MiB)
n_runs = 3
4.512793 seconds (152 allocations: 26.718 MiB)
n_runs = 4
4.721820 seconds (164 allocations: 30.533 MiB)
n_runs = 5
4.551193 seconds (157 allocations: 34.347 MiB)
n_runs = 10
5.664418 seconds (181 allocations: 53.422 MiB)
n_runs = 15
6.958389 seconds (197 allocations: 72.496 MiB)
n_runs = 20
7.912543 seconds (219 allocations: 91.570 MiB)
n_runs = 25
8.733596 seconds (235 allocations: 110.645 MiB)
danielwe@mlhpc2:~/tmp$ julia --procs=25 revised.jl
n_runs = 1
4.339249 seconds (264 allocations: 15.279 MiB)
n_runs = 2
4.422059 seconds (325 allocations: 15.281 MiB)
n_runs = 3
4.474285 seconds (392 allocations: 15.284 MiB)
n_runs = 4
4.391475 seconds (446 allocations: 15.286 MiB)
n_runs = 5
4.705617 seconds (516 allocations: 15.289 MiB)
n_runs = 10
7.042835 seconds (819 allocations: 15.299 MiB)
n_runs = 15
10.150502 seconds (1.14 k allocations: 15.315 MiB)
n_runs = 20
16.250822 seconds (1.47 k allocations: 15.323 MiB)
n_runs = 25
16.513299 seconds (1.75 k allocations: 15.336 MiB)
Shared arrays
To work around the cache bottleneck, let’s use the SharedArrays
standard library to share the levels
array across processes. This is a minor modification of the above code: we allocate the array using levels = SharedMatrix{T}(N, np)
and access the underlying Matrix
as sdata(levels)
. (The latter is not necessary for the code to work, as SharedArray
supports the AbstractArray
interface, but it’s required for maximum performance because it ensures that we hit optimized rather than fallback methods for rand!
and mul!
.)
The resulting code looks like this:
revised_shared.jl
using Distributed
@everywhere using LinearAlgebra
@everywhere using SharedArrays
@everywhere using Random
# Use single-threaded BLAS sice we're parallelizing at a higher level
@everywhere BLAS.set_num_threads(1)
@everywhere const THRESHOLD = 0.1
@everywhere function inner!(ensemble, levels, weights)
T = eltype(levels)
s1 = zero(T)
s2 = zero(T)
mul!(ensemble, levels, weights)
for p in axes(levels, 2)
m200p = @view levels[:, p]
s1 += sum(m200p)
s2 += sum(
i -> ifelse(ensemble[i] > THRESHOLD, m200p[i], zero(eltype(m200p))),
eachindex(ensemble, m200p),
)
end
return s2 / s1
end
@everywhere function outer(count, levels)
T = eltype(levels)
best = typemax(T)
weights = Vector{T}(undef, size(levels, 2))
ensemble = Vector{T}(undef, size(levels, 1))
for _ in 1:count
rand!(weights)
r = inner!(ensemble, levels, weights)
best = min(best, r)
end
return best
end
function main_procs(n_runs, n_iter)
N = 500000
np = 4
T = typeof(THRESHOLD)
levels = SharedMatrix{T}(N, np)
rand!(sdata(levels))
doit = let n_iter=n_iter, levels=levels
# I reflexively write closures like this to make sure the captured variables
# aren't boxed; it probably doesn't matter in this case, however
doit(_) = outer(n_iter, sdata(levels))
end
results = pmap(doit, 1:n_runs)
# Use results so that it is not optimised out
print(devnull, results[end])
end
function main_threads(n_runs, n_iter)
N = 500000
np = 4
T = typeof(THRESHOLD)
levels = rand(T, N, np)
results = Vector{T}(undef, n_runs)
Threads.@threads for i in 1:n_runs
results[i] = outer(n_iter, levels)
end
# Use results so that it is not optimised out
print(devnull, results[end])
end
function main(n_iter)
f, n_runs_max = if nprocs() == 1
main_threads, Threads.nthreads()
elseif Threads.nthreads() == 1
main_procs, nworkers()
else
error("Combined multithreading and multiprocessing not supported")
end
f(n_runs_max, 1) # Compile on all workers
for n_runs in Iterators.flatten((1:4, 5:5:25))
(n_runs > n_runs_max) && break
@everywhere GC.gc()
@show n_runs
@time f(n_runs, n_iter)
end
end
main(1000)
Here are the timings for multiprocessing (the multithreaded code is unchanged from above):
revised_shared.jl timings
danielwe@mlhpc2:~/tmp$ julia --procs=25 revised_shared.jl
n_runs = 1
4.351229 seconds (2.16 k allocations: 94.383 KiB)
n_runs = 2
4.539514 seconds (2.24 k allocations: 97.930 KiB)
n_runs = 3
4.602810 seconds (2.33 k allocations: 105.820 KiB)
n_runs = 4
4.518517 seconds (2.42 k allocations: 110.492 KiB)
n_runs = 5
4.663221 seconds (2.50 k allocations: 115.227 KiB)
n_runs = 10
6.302061 seconds (2.94 k allocations: 137.305 KiB)
n_runs = 15
6.564405 seconds (3.85 k allocations: 176.664 KiB)
n_runs = 20
7.231386 seconds (3.80 k allocations: 186.539 KiB)
n_runs = 25
8.795986 seconds (4.23 k allocations: 209.492 KiB)
These timings are almost identical to the multithreaded timings, accruing a slowdown of about 2x from 1 to 25 processes. Hence, SharedArrays
worked for eliminating the performance drop-off due to outgrowing the CPU cache. It does not outperform multithreading, however, suggesting that the lack of perfect scaling at this point is not due to single-threaded GC. If you can get your actual code to a similar state where @time
doesn’t report any GC time and multithreading performs as well as multiprocessing, there’s probably no reason to prefer multiprocessing.
I haven’t looked into how to get closer to perfect scaling from here, but my hunch is that the answer lies in thinking more about hardware resources and topology. Specifically, the machine I’m using has 4 CPUs with 10 physical cores each. Thus, if I’m using more than 10 threads or processes, levels
is likely shared across multiple CPUs, so it can’t just live in a single CPU’s cache. I have no idea how hardware deals with this, but it sounds like it could be a bottleneck. The next thing I would try is to use ThreadPinning.jl to pin threads to cores and make as many copies of levels
as there are CPUs, sharing each copy only between the threads pinned to the same CPU. Beyond that, I defer to people more knowledgeable than me.
There’s also the question of how much more you can optimize your serial code, depending on your actual problem, see @foobar_lv2’s post above.
Hope this helps!