Garbage collection and threading

I’ve been struggling with performance issues with my code for weeks. I’ve now narrowed down the primary problem to what seems to be a threading bottleneck in the garbage collector. Sorry, if this has been discussed already.

The following MWE follows the structure of the code, basically subsetting a matrix and performing a calculation on that. Note, that in the MWE I take the sum but in reality I calculate a histogram which in turn is further processed.

const THRESHOLD = 0.1

function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
  thresholded .= transpose(sum(levels .* weights, dims=1) .> THRESHOLD )
end  

function inner(weights::Vector{Float64}, levels::Matrix{Float64})::Float64
  thresholded = Vector{Bool}(undef, size(levels)[2])
  ensemble_and_threshold!(thresholded, levels, weights)
  s1 = 0.0
  s2 = 0.0
  np = length(weights)
  for p in 1:np
    m200p = levels[p, :]
    p200p = levels[p, thresholded]
   
   # In reality, the next step in the calculation is significantly more involved 
   # than suggested here.
    s1 += sum(m200p)
    s2 += sum(p200p)
  end
  return s2 / s1
end

function outer(count::Int, levels::Matrix{Float64})::Float64
  best = Inf
  for _ in 1:count
    weights = rand(4)
    r = inner(weights, levels)
    best = min(best, r)
  end
  return best
end

function main(n_runs)
  N = 100000
  np = 4
  levels = rand(np, N)

  results = zeros(n_runs)
  Threads.@threads for i in 1:n_runs
    results[i] = outer(10000, levels)
  end

  # Use results so that it is not optimsed out
  println(devnull, results)
end

main(1) # Run once for compilation

for n_runs in 1:4
  @time main(n_runs)
end

I need to do this ~1000 times and each iteration is independent of the others so it should be embarassingly parallel. However, there is a bottleneck that prevents it scaling beyond about 4-8 processors. This is the output from the MWE:

➜ julia --threads=4  micro.jl                       
 20.640172 seconds (260.03 k allocations: 97.917 GiB, 14.99% gc time)
 27.225064 seconds (520.03 k allocations: 195.826 GiB, 25.38% gc time)
 31.511773 seconds (780.04 k allocations: 293.733 GiB, 30.25% gc time)
 36.147297 seconds (1.04 M allocations: 391.653 GiB, 35.14% gc time)

This was run on an M1 Mac with 4 performance cores. I see the same with Julia on Linux. There is a very slight improvement with 1.10.0-rc2.

I would expect the time to be the same for each run, but the runs are getting longer, nealy 2 twice as long for 4 cores and the relative amount of the time in the garbage collector more than doubles. This suggests that the gc is, at least partially, single threaded.

This problem should be far more scalable than it is. What can I do to improve it?

I would suggest to remove excessive allocations, if possible.
This line m200p = levels[p, :] allocates a new vector in every iteration. Try to use view imstead

2 Likes

Some quick tests (second run in all cases):

Original code here:

julia> @time main(1)
 15.794551 seconds (260.06 k allocations: 97.913 GiB, 4.07% gc time)

Adding @views in front of function inner...

julia> @time main(1)
 14.663373 seconds (180.06 k allocations: 68.111 GiB, 3.42% gc time)

Removing allocations from a inner function:

function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
  for i in 1:size(levels)[2]
    s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
    thresholded[i] = s > THRESHOLD
  end
  #thresholded .= transpose( sum(levels .* weights, dims=1) .> THRESHOLD )
end  
julia> @time main(1)
  8.650660 seconds (110.06 k allocations: 30.703 GiB, 2.95% gc time)

Make weights a rand(SVector{4}) (from StaticArrays - needs to change the input signatures to AbstractVector):

julia> @time main(1)
  8.261985 seconds (100.06 k allocations: 30.697 GiB, 3.05% gc time)

Allocate thresholded outside the inner loop:

@views function inner(
  weights::AbstractVector{Float64}, levels::Matrix{Float64};
  thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
...
  thresholded = Vector{Bool}(undef, size(levels)[2])
  for _ in 1:count
    weights = rand(SVector{4})
    r = inner(weights, levels; thresholded = thresholded)

Gives:

julia> @time main(1)
  7.906400 seconds (80.06 k allocations: 29.767 GiB, 2.95% gc time)

The remaining allocations are that of thresholded, which is done one per loop, and some in this line:

    p200p = levels[p, thresholded]

Which could be avoided, but that one would need to know if it is worth the trouble, depending on the true calculations that follow.

Finally, I changed the main function to make it easier to test the scalability:

using ChunkSplitters
function main2(n_runs; nchunks = Threads.nthreads())
  N = 100000
  np = 4
  levels = rand(np, N)
  results = zeros(n_runs)
  Threads.@threads for (irange, _) in chunks(1:n_runs, nchunks)
    for i in irange
      results[i] = outer(10000, levels)
    end
  end
  # Use results so that it is not optimsed out
  println(devnull, results)
end

the number of chunks (while <= number of threads) will be the number of threads used. With that, I get:

I have 8 cores here:

julia> @time main2(8; nchunks = 1)
 76.328059 seconds (649.46 k allocations: 238.106 GiB, 3.98% gc time, 0.18% compilation time: 10% of which was recompilation)

julia> @time main2(8; nchunks = 4)
 20.942653 seconds (640.09 k allocations: 238.107 GiB, 5.98% gc time)

julia> @time main2(8; nchunks = 8)
 16.748673 seconds (640.09 k allocations: 238.100 GiB, 7.95% gc time)

It doesn’t seem too bad. You may have to make the inner functions completely non-allocating to get better scaling, probably.

edit:

Some additional improvements. With the chunked threading, you can easily allocate only one threshholder per chunk:

function outer(count::Int, levels::Matrix{Float64};
  thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
...
  Threads.@threads for (irange, _) in chunks(1:n_runs, nchunks)
    thresholded = Vector{Bool}(undef, size(levels)[2])
    for i in irange
      results[i] = outer(10000, levels; thresholded = thresholded)

With that and removing the inner allocation of the slice given by thresholded (I’m unsure if that is actually practical in the real code), you get:

    s2 = 0.0
    for i in eachindex(thresholded)
      if thresholded[i]
        s2 += m200p[i]
      end
julia> @time main2(8; nchunks = 1)
 36.377214 seconds (84 allocations: 3.156 MiB)

julia> @time main2(8; nchunks = 8)
  5.494856 seconds (98 allocations: 3.824 MiB)

The final serial code is about 3x faster than the original

julia> @time main2(1; nchunks = 1)
  4.512136 seconds (63 allocations: 3.153 MiB)
code
using StaticArrays
using ChunkSplitters
const THRESHOLD = 0.1

function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::AbstractVector{Float64})
  for i in 1:size(levels)[2]
    s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
    thresholded[i] = s > THRESHOLD
  end
  #thresholded .= transpose( sum(levels .* weights, dims=1) .> THRESHOLD )
end  

@views function inner(
  weights::AbstractVector{Float64}, levels::Matrix{Float64};
  thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
  ensemble_and_threshold!(thresholded, levels, weights)
  s1 = 0.0
  s2 = 0.0
  np = length(weights)
  for p in 1:np
    m200p = levels[p, :]
    #p200p = levels[p, thresholded]
   
   # In reality, the next step in the calculation is significantly more involved 
   # than suggested here.
    s1 += sum(m200p)
    s2 = 0.0
    for i in eachindex(thresholded)
      if thresholded[i]
        s2 += m200p[i]
      end
    end
  end
  return s2 / s1
end

function outer(count::Int, levels::Matrix{Float64};
  thresholded = Vector{Bool}(undef, size(levels)[2])
)::Float64
  best = Inf
  for _ in 1:count
    weights = rand(SVector{4})
    r = inner(weights, levels; thresholded = thresholded)
    best = min(best, r)
  end
  return best
end

function main(n_runs)
  N = 100000
  np = 4
  levels = rand(np, N)

  results = zeros(n_runs)
  Threads.@threads for i in 1:n_runs
    results[i] = outer(10000, levels)
  end

  # Use results so that it is not optimsed out
  println(devnull, results)
end

#main(1) # Run once for compilation
#
#for n_runs in 1:4
#  @time main(n_runs)
#end

function main2(n_runs; nchunks = Threads.nthreads())
  N = 100000
  np = 4
  levels = rand(np, N)
  results = zeros(n_runs)
  Threads.@threads for (irange, _) in chunks(1:n_runs, nchunks)
    thresholded = Vector{Bool}(undef, size(levels)[2])
    for i in irange
      results[i] = outer(10000, levels; thresholded = thresholded)
    end
  end
  # Use results so that it is not optimsed out
  println(devnull, results)
end
11 Likes

To address this directly: Yes, the GC is single-threaded up to and including 1.9. As you’ve observed, this quickly becomes a bottleneck when using multithreading with code that allocates a lot. Some notion of GC multithreading is being introduced in 1.10 and expanded further in 1.11, but I’m not qualified to describe the improvements. You’ll find some discussions about this if you search this forum.

Beyond eliminating allocation as much as you can (see @lmiq’s answer above), the other solution is to replace multithreading with multiprocessing using Distributed.jl (or perhaps Dagger.jl). Since each process has its own GC, this completely removes the scaling bottleneck. There’s a small overhead for process communication, but your problem is embarrassingly parallel and takes several seconds or more even in @lmiq’s optimized version of your MWE, so that shouldn’t be anything to worry about.

2 Likes

Thank you for the detailed analysis and suggestions.

Your version of ensemble_and_threshold! makes the single biggested difference and it’s quite substantial. I was a bit dissappointed that my version performs so poorly. Can you explain where the allocations are occuring in my version?

Interestingly, distributed is actually worse than threaded. Why would that be be?

Using some of the improvement suggested by @lmiq, I tested with up to 25 threads or processes in a dedicated 50 CPU Linux cgroup (to avoid confounding issues of hyperthreading and inteference from other workloads) and plotted the results.

Hard to say without seeing the code you’re running. This has not been my experience. I’m by no means an expert here, but could always take a look at an MWE and point out if there’s anything I would have done differently.

  • levels .* weights allocates a new array
  • s .> THRESHOLD allocates a new array

There are various tricks for writing contractions like this in an allocation-free way, for example using MappedArrays.jl, but often it’s easiest and most readable to just write out the loop like @lmiq did (though I’d replace 1:size(levels)[2] with axes(levels, 2) for a slightly more compact and generic iteration specification).

3 Likes

Your main memory bandwidth consumer are reads from the levels matrix (841e5 = 3.2 megabytes). If you have many processes who share the levels matrix by-value instead of by-shared-memory then you’re thrashing your L3 (which is shared between cores). So it very much depends on details on how you share the weights matrix. So you would need to post a runnable MWE for us to reproduce.

Secondly, I suggest you refactor your code here:

The problem with this example dummy code is that s1 = sum(levels)) is a constant, i.e. independent of weights, and can be computed in main outside of all loops. Is this true for your real problem? If so, then do that!

I presume this is not the case. Then you must spend at least 10 minutes to come up with a dummy calculation that has the same (and no more!) optimizable structure than your true problem. Otherwise you will send us all on a very fruitless goose chase.

Another thing you must be careful of: Your example dummy calculation requires no random access to thresholded, i.e. it would be a valid strategy to never allocate thresholded at all, and instead compute thresholded_i = .... Is this true of your real calculation? Your benchmark dummy must very carefully match your true calculation!

3 Likes

Why is it different for threads vs processes? Perhaps the OS does not invalidate the caches for threads? Is levels matrix shared in the threads case?

It is not the case. The real code is quite complicated with many levels of indirection. The MWE demonstrates what I suspected was a bottleneck.

Please don’t say that. The suggestions and insights that have been provided here have really helped me understand what is going on. I’m not expecting anyone to write my code for me.

See, another very helpful suggestion. I assumed that broadcast operations would be the most effecient but seems that is not case from the refactoring of the ensemble_and_threshold! function.

Here is the code that compares threads vs procs. I change the assignment to main depending on the benchmark I’m running.

using Distributed

@everywhere const THRESHOLD = 0.1

@everywhere function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
  for i in 1:size(levels)[2]
    s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
    thresholded[i] = s > THRESHOLD
  end
  # thresholded .= transpose(sum(levels .* weights, dims=1) .> THRESHOLD )
end  

@everywhere function inner(weights::Vector{Float64}, levels::Matrix{Float64})::Float64
  thresholded = Vector{Bool}(undef, size(levels)[2])
  ensemble_and_threshold!(thresholded, levels, weights)
  s1 = 0.0
  s2 = 0.0
  np = length(weights)
  for p in 1:np
    m200p = @view levels[p, :]
    p200p = levels[p, thresholded]
    s1 += sum(m200p)
    s2 += sum(p200p)
  end
  return s2 / s1
end

@everywhere function outer(count::Int, levels::Matrix{Float64})::Float64
  best = Inf
  for _ in 1:count
    weights = rand(4)
    r = inner(weights, levels)
    best = min(best, r)
  end
  return best
end

function main_procs(n_runs)
  @everywhere N = 500000
  @everywhere np = 4
  @everywhere levels = rand(np, N)
  @everywhere n_iter = 10000

  result_channel = RemoteChannel(()->Channel{Float64}(n_runs));

  doit(_) = put!(result_channel, outer(n_iter, levels)) 
  pmap(doit, 1:n_runs)

  results = map(1:n_runs) do _
    take!(result_channel)
  end

  # Use results so that it is not optimsed out
  println(devnull, results)
end

function main_threads(n_runs)
  N = 500000
  np = 4
  levels = rand(np, N)
  n_iter = 10000

  result_channel = RemoteChannel(()->Channel{Float64}(n_runs));
  doit(_) = put!(result_channel, outer(n_iter, levels)) 

  Threads.@threads for i in 1:n_runs
    doit(i)
  end

  results = map(1:n_runs) do _
    take!(result_channel)
  end

  # Use results so that it is not optimsed out
  println(devnull, results)
end

main = main_procs

main(1) # Run once for compilation


for n_runs in vcat(collect(1:4),collect(5:5:25))
  @show n_runs
  @time main(n_runs)
end

Here is SLURM script used to run it:

#!/bin/bash

#SBATCH --time=1-00:00:00
#SBATCH --cpus-per-task=50
#SBATCH --mem=40G
#SBATCH --qos=bonus
#SBATCH --output=x-%A.out
#SBATCH --job-name=Neville

hostname

CMD="julia --procs=25 micro.jl"
echo $CMD
srun --unbuffered $CMD

echo exit code: $?

Hi @evan-wehi, I’ve been playing around a bit with this, but I have one question before I write up what I’ve found: When you write @everywhere levels = rand(np, N), do you actually intend each worker to sample its own instance of levels? Because that’s what @everywhere accomplishes, and it’s different from what you’re doing in the multithreaded case, where all threads share the same levels. That said, these independent instances of levels are not actually used in the rest of your code—they are overwritten by the instance of levels captured in the doit closure, which is the one sampled on the main process. So in the end, your multiprocessing and multithreaded codes accomplish the same thing, however, the multiprocessing code does a lot of unnecessary sampling work up front that you could have eliminated by removing all occurrences of @everywhere within main_procs.

So to clarify, should each parallel thread/process share the same levels or use independently sampled instances?

Here's an MWE illustrating what I'm talking about above
julia> using Distributed

julia> function f()
           @everywhere x = rand()             # Samples x independently on each worker
           printlocal(_) = println(@eval(x))  # Prints the worker's own instance of x
           printmain(_) = println(x)          # Copies x over from the main process and prints it
           pmap(printlocal, 1:3)
           println()
           pmap(printmain, 1:3)
           println()
           pmap(printlocal, 1:3)              # Note how all workers now have the same x due to printmain
           return nothing
       end;

julia> f()
      From worker 3:	0.5299608462035816
      From worker 4:	0.3448114078538431
      From worker 2:	0.7929854408997841
      
      From worker 3:	0.017534970889111157
      From worker 2:	0.017534970889111157
      From worker 4:	0.017534970889111157
      
      From worker 4:	0.017534970889111157
      From worker 2:	0.017534970889111157
      From worker 3:	0.017534970889111157

levels should be the same everywhere and so could be in shared memory. In the real code this is data read from disk. It’s about the same size as I used in the MWE so the overhead of copying it to each address space is relatively small. Perhaps, it could be kept in cache for each process, it that’s possible.

FWIW, I’d do something like the following for a nice 40x speedup singlethreaded over @lmiq 's code.

However, I am pretty sure that this is not what you want – this makes too much use of structure where you said this is a standin for a larger computation.

Below code used the following:

  1. s1 and therefore m200p are entirely loop independent, so we can pull it out of everything
  2. Walking over the big levels thing for each weights has bad compute / memory density. So we do that for Chunks=8 different weigths at the same time. We can profitably to that (and simd!) because your kernel is branch-free (I use a matrix-vector multiply to compute multiple dot-products at the same time).

Now it is unknowable to us whether this is useful for you, or a complete waste of time. Because we don’t have a good model for your computation.

Really sit down after you propose a model, and try for 15 minutes to “break it”. If somebody here proposes an optimization that is not applicable to your real problem, but is applicable to your model, then this is a learning opportunity for you – how to ask better questions / write better models to stand-in for your real problem. Saying “this is bad” is not enough, you need to fix the model.

julia> using LinearAlgebra,StaticArrays,Random
julia> function inner(
         weights::SMatrix{Chunks, NLevels, T, S}, levels::Vector{SVector{NLevels,T}}, threshold) where {Chunks,NLevels,T,S}
         s = zeros(SVector{Chunks,T})
         for v in levels
           thresholded = (weights*v) .> threshold
           s += thresholded * v[NLevels]
         end
         s
         end
julia> function outer(count::Int, levels::Vector{SVector{NLevels,T}}, threshold, ::Val{Chunks}) where {NLevels,T,Chunks}
         best = T(Inf)
         for _ in 1:count
           weights = rand(SMatrix{Chunks,NLevels,T,Chunks*NLevels})
           r2 = inner(weights, levels, threshold)
           best = min(best, minimum(r2))
         end
         return best
       end
julia> function main(n_runs)
       np = 4
       N = 100_000
       count = 10_000
       threshold = 0.1
       levels = rand(np,N)
       levels2 = ccall(:jl_reshape_array, Array{SVector{4,Float64},1}, (Any, Any, Any), Array{SVector{4,Float64},1}, levels, (N,))
       s = sum(levels)
       results = zeros(n_runs)
       for i in 1:n_runs
           results[i] = outer(Int(count/8), levels2, threshold, Val(8)) /  s
       end
       results
       end
julia> @time main(1)
  0.289770 seconds (4 allocations: 3.052 MiB)
1-element Vector{Float64}:
 0.0
1 Like

Given that levels should be shared, I created a slightly sanitized version of your code, removing the unnecessary @everywheres. Also, in the interest of time when iterating on the code, I measure at n_iter = 1000 and compile at n_iter = 1, and I rewrote the main invocation such that I can control threads vs. procs using the Julia launch configuration. Finally, I’m calling GC.gc() before every timing such that they always start from a blank slate (though this probably doesn’t matter at these time scales). This version is my baseline for further improvements. EDIT: I also removed the RemoteChannel, as it wasn’t necessary.

santized.jl
using Distributed

@everywhere const THRESHOLD = 0.1

@everywhere function ensemble_and_threshold!(thresholded::Vector{Bool}, levels::Matrix{Float64}, weights::Vector{Float64})
  for i in 1:size(levels)[2]
    s = sum(levels[j, i] * weights[j] for j in eachindex(weights))
    thresholded[i] = s > THRESHOLD
  end
  # thresholded .= transpose(sum(levels .* weights, dims=1) .> THRESHOLD )
end  

@everywhere function inner(weights::Vector{Float64}, levels::Matrix{Float64})::Float64
  thresholded = Vector{Bool}(undef, size(levels)[2])
  ensemble_and_threshold!(thresholded, levels, weights)
  s1 = 0.0
  s2 = 0.0
  np = length(weights)
  for p in 1:np
    m200p = @view levels[p, :]
    p200p = levels[p, thresholded]
    s1 += sum(m200p)
    s2 += sum(p200p)
  end
  return s2 / s1
end

@everywhere function outer(count::Int, levels::Matrix{Float64})::Float64
  best = Inf
  for _ in 1:count
    weights = rand(4)
    r = inner(weights, levels)
    best = min(best, r)
  end
  return best
end

function main_procs(n_runs, n_iter)
  N = 500000
  np = 4
  levels = rand(np, N)

  doit(_) = outer(n_iter, levels)
  results = pmap(doit, 1:n_runs)

  # Use results so that it is not optimised out
  println(devnull, results[end])
end

function main_threads(n_runs, n_iter)
  N = 500000
  np = 4
  levels = rand(np, N)

  results = Vector{Float64}(undef, n_runs)
  Threads.@threads for i in 1:n_runs
    results[i] = outer(n_iter, levels)
  end

  # Use results so that it is not optimised out
  println(devnull, results[end])
end

function main(n_iter)
  f, n_runs_max = if nprocs() == 1
    main_threads, Threads.nthreads()
  elseif Threads.nthreads() == 1
    main_procs, nworkers()
  else
    error("Combined multithreading and multiprocessing not supported")
  end
  f(n_runs_max, 1)  # Compile on all workers
  for n_runs in Iterators.flatten((1:4, 5:5:25))
    (n_runs > n_runs_max) && break
    @everywhere GC.gc()
    @show n_runs
    @time f(n_runs, n_iter)
  end
end

main(1000)

Here are the timings I get on the machine I’m using:

sanitized.jl timings
danielwe@mlhpc2:~/tmp$ julia --threads=25 sanitized.jl
n_runs = 1
 18.176020 seconds (11.16 k allocations: 15.348 GiB, 3.16% gc time)
n_runs = 2
 25.129455 seconds (22.14 k allocations: 30.695 GiB, 4.06% gc time)
n_runs = 3
 25.221908 seconds (33.15 k allocations: 46.042 GiB, 5.13% gc time)
n_runs = 4
 27.767858 seconds (44.14 k allocations: 61.414 GiB, 4.53% gc time)
n_runs = 5
 30.456231 seconds (55.15 k allocations: 76.746 GiB, 5.11% gc time)
n_runs = 10
 43.307271 seconds (110.14 k allocations: 153.480 GiB, 4.83% gc time)
n_runs = 15
 51.821543 seconds (165.16 k allocations: 230.176 GiB, 5.15% gc time)
n_runs = 20
 60.507017 seconds (220.15 k allocations: 306.965 GiB, 4.81% gc time)
n_runs = 25
 63.995065 seconds (275.16 k allocations: 383.647 GiB, 4.92% gc time)
danielwe@mlhpc2:~/tmp$ julia --procs=25 sanitized.jl
n_runs = 1
 16.986917 seconds (265 allocations: 15.279 MiB)
n_runs = 2
 19.430964 seconds (323 allocations: 15.281 MiB)
n_runs = 3
 21.465102 seconds (791 allocations: 15.297 MiB)
n_runs = 4
 17.095496 seconds (822 allocations: 15.299 MiB)
n_runs = 5
 20.871957 seconds (521 allocations: 15.289 MiB)
n_runs = 10
 31.219670 seconds (821 allocations: 15.302 MiB)
n_runs = 15
 48.659917 seconds (1.14 k allocations: 15.314 MiB)
n_runs = 20
 67.285127 seconds (1.44 k allocations: 15.324 MiB)
n_runs = 25
 75.453670 seconds (1.75 k allocations: 15.334 MiB)

As you can see, multithreading and multiprocessing scale about the same for me. The thing that stands out is the ~5 % gc time. That’s a lot of performance left on the table. We need to fix that whether we’re parallelizing or not.

General improvements

I’m trying to respect your structure as much as possible, so hopefully, most of these improvements translate to your real code.

  1. Remove allocations as much as possible
    • Allocate weights outside the outer loop and use rand!(weights) instead of weights = rand(4).
    • Allocate thresholded (or something equivalent, see below) outside the outer loop and pass it as an argument to inner!.
    • Avoid allocating p200p. This cannot be done with a view, since thresholded does not provide a fixed stride. However, we can avoid instantiating it altogether if we always pass m200p and thresholded together and operate on them in tandem. In your example, sum(p200p) can be replaced with
      sum(
          i -> ifelse(thresholded[i], m200p[i], zero(eltype(m200p))),
          eachindex(thresholded, m200p),
      )
      
      I’m using ifelse instead of ? here to avoid a branch in the inner loop so that, hopefully, this will SIMD. More generally, if you’re looping over p200p, you can replace for x in p200p with
      for i in eachindex(thresholded, m200p)
          if thresholded[i]
              x = m200p[i]
              ...
      
      I hope this pattern works for your actual use case. Not instantiating p200p will save you a lot of allocations.
  2. Improve memory access patterns.
    • You’re slicing along the second dimension in m200p = @view levels[p, :]. Julia arrays are column-major, so if you slice along the first dimension instead, your view will be backed by contiguous memory, which is always preferable. This suggests we should transpose levels.
  3. Use BLAS.
    • ensemble_and_threshold! is just a matrix-vector product followed by thresholding. Linear algebra primitives let us leverage optimized BLAS routines.
    • BLAS won’t give us thresholded directly, it provides ensemble = levels * weights such that thresholded[i] = ensemble[i] > THRESHOLD. As long as we’re not accessing thresholded[i] too many times, we may as well avoid instantiating it and instead replace every occurrence of thresholded[i] with ensemble[i] > THRESHOLD. (If your real code accesses thresholded[i] much more than your MWE, it might be worth seeing if you can leverage Tullio.jl, LoopVectorization.jl, or similar to obtain thresholded from levels and weights in a single BLAS-like pass.)
    • To avoid allocations we shouldn’t actually write ensemble = levels * weights, but rather mul!(ensemble, levels, weights), and allocate ensemble outside the outer loop.

With these changes, the code looks as follows. (Note that I also removed type annotations for readability, and made everything generic with THRESHOLD implicitly determining the float type used throughout—these changes don’t matter for performance, but happen somewhat reflexively to me.)

revised.jl
using Distributed

@everywhere using LinearAlgebra
@everywhere using Random

# Use single-threaded BLAS sice we're parallelizing at a higher level
@everywhere BLAS.set_num_threads(1)

@everywhere const THRESHOLD = 0.1

@everywhere function inner!(ensemble, levels, weights)
  T = eltype(levels)
  s1 = zero(T)
  s2 = zero(T)
  mul!(ensemble, levels, weights)
  for p in axes(levels, 2)
    m200p = @view levels[:, p]
    s1 += sum(m200p)
    s2 += sum(
      i -> ifelse(ensemble[i] > THRESHOLD, m200p[i], zero(eltype(m200p))),
      eachindex(ensemble, m200p),
    )
  end
  return s2 / s1
end

@everywhere function outer(count, levels)
  T = eltype(levels)
  best = typemax(T)
  weights = Vector{T}(undef, size(levels, 2))
  ensemble = Vector{T}(undef, size(levels, 1))
  for _ in 1:count
    rand!(weights)
    r = inner!(ensemble, levels, weights)
    best = min(best, r)
  end
  return best
end

function main_procs(n_runs, n_iter)
  N = 500000
  np = 4

  T = typeof(THRESHOLD)
  levels = rand(N, np)

  doit = let n_iter=n_iter, levels=levels
    # I reflexively write closures like this to make sure the captured variables
    # aren't boxed; it probably doesn't matter in this case, however
    doit(_) = outer(n_iter, levels)
  end
  results = pmap(doit, 1:n_runs)

  # Use results so that it is not optimised out
  print(devnull, results[end])
end

function main_threads(n_runs, n_iter)
  N = 500000
  np = 4

  T = typeof(THRESHOLD)
  levels = rand(T, N, np)

  results = Vector{T}(undef, n_runs)
  Threads.@threads for i in 1:n_runs
    results[i] = outer(n_iter, levels)
  end

  # Use results so that it is not optimised out
  print(devnull, results[end])
end

function main(n_iter)
  f, n_runs_max = if nprocs() == 1
    main_threads, Threads.nthreads()
  elseif Threads.nthreads() == 1
    main_procs, nworkers()
  else
    error("Combined multithreading and multiprocessing not supported")
  end
  f(n_runs_max, 1)  # Compile on all workers
  for n_runs in Iterators.flatten((1:4, 5:5:25))
    (n_runs > n_runs_max) && break
    @everywhere GC.gc()
    @show n_runs
    @time f(n_runs, n_iter)
  end
end

main(1000)

Here’s what the timings look like. Notice most importantly that serial performance is improved by about 4x and there’s no longer any measurable GC time. Scaling is also improved in the multithreaded case: The slowdown from 1 to 25 threads is less than 2x instead of more than 3.5x. However, the multiprocessing performance drops off a cliff with more than 10 processes. This is probably related to cache utilization, as @foobar_lv2 talked about earlier: levels takes up 16 MB, so 25 copies of it require 400 MB, while this machine has a total 120 MB CPU cache (4 CPUs with 30 MB each).

revised.jl timings
danielwe@mlhpc2:~/tmp$ julia --threads=25 revised.j
n_runs = 1
  4.460417 seconds (160 allocations: 19.088 MiB)
n_runs = 2
  4.448414 seconds (144 allocations: 22.903 MiB)
n_runs = 3
  4.512793 seconds (152 allocations: 26.718 MiB)
n_runs = 4
  4.721820 seconds (164 allocations: 30.533 MiB)
n_runs = 5
  4.551193 seconds (157 allocations: 34.347 MiB)
n_runs = 10
  5.664418 seconds (181 allocations: 53.422 MiB)
n_runs = 15
  6.958389 seconds (197 allocations: 72.496 MiB)
n_runs = 20
  7.912543 seconds (219 allocations: 91.570 MiB)
n_runs = 25
  8.733596 seconds (235 allocations: 110.645 MiB)
danielwe@mlhpc2:~/tmp$ julia --procs=25 revised.jl
n_runs = 1
  4.339249 seconds (264 allocations: 15.279 MiB)
n_runs = 2
  4.422059 seconds (325 allocations: 15.281 MiB)
n_runs = 3
  4.474285 seconds (392 allocations: 15.284 MiB)
n_runs = 4
  4.391475 seconds (446 allocations: 15.286 MiB)
n_runs = 5
  4.705617 seconds (516 allocations: 15.289 MiB)
n_runs = 10
  7.042835 seconds (819 allocations: 15.299 MiB)
n_runs = 15
 10.150502 seconds (1.14 k allocations: 15.315 MiB)
n_runs = 20
 16.250822 seconds (1.47 k allocations: 15.323 MiB)
n_runs = 25
 16.513299 seconds (1.75 k allocations: 15.336 MiB)

Shared arrays

To work around the cache bottleneck, let’s use the SharedArrays standard library to share the levels array across processes. This is a minor modification of the above code: we allocate the array using levels = SharedMatrix{T}(N, np) and access the underlying Matrix as sdata(levels). (The latter is not necessary for the code to work, as SharedArray supports the AbstractArray interface, but it’s required for maximum performance because it ensures that we hit optimized rather than fallback methods for rand! and mul!.)

The resulting code looks like this:

revised_shared.jl
using Distributed

@everywhere using LinearAlgebra
@everywhere using SharedArrays
@everywhere using Random

# Use single-threaded BLAS sice we're parallelizing at a higher level
@everywhere BLAS.set_num_threads(1)

@everywhere const THRESHOLD = 0.1

@everywhere function inner!(ensemble, levels, weights)
  T = eltype(levels)
  s1 = zero(T)
  s2 = zero(T)
  mul!(ensemble, levels, weights)
  for p in axes(levels, 2)
    m200p = @view levels[:, p]
    s1 += sum(m200p)
    s2 += sum(
      i -> ifelse(ensemble[i] > THRESHOLD, m200p[i], zero(eltype(m200p))),
      eachindex(ensemble, m200p),
    )
  end
  return s2 / s1
end

@everywhere function outer(count, levels)
  T = eltype(levels)
  best = typemax(T)
  weights = Vector{T}(undef, size(levels, 2))
  ensemble = Vector{T}(undef, size(levels, 1))
  for _ in 1:count
    rand!(weights)
    r = inner!(ensemble, levels, weights)
    best = min(best, r)
  end
  return best
end

function main_procs(n_runs, n_iter)
  N = 500000
  np = 4

  T = typeof(THRESHOLD)
  levels = SharedMatrix{T}(N, np)
  rand!(sdata(levels))

  doit = let n_iter=n_iter, levels=levels
    # I reflexively write closures like this to make sure the captured variables
    # aren't boxed; it probably doesn't matter in this case, however
    doit(_) = outer(n_iter, sdata(levels))
  end
  results = pmap(doit, 1:n_runs)

  # Use results so that it is not optimised out
  print(devnull, results[end])
end

function main_threads(n_runs, n_iter)
  N = 500000
  np = 4

  T = typeof(THRESHOLD)
  levels = rand(T, N, np)

  results = Vector{T}(undef, n_runs)
  Threads.@threads for i in 1:n_runs
    results[i] = outer(n_iter, levels)
  end

  # Use results so that it is not optimised out
  print(devnull, results[end])
end

function main(n_iter)
  f, n_runs_max = if nprocs() == 1
    main_threads, Threads.nthreads()
  elseif Threads.nthreads() == 1
    main_procs, nworkers()
  else
    error("Combined multithreading and multiprocessing not supported")
  end
  f(n_runs_max, 1)  # Compile on all workers
  for n_runs in Iterators.flatten((1:4, 5:5:25))
    (n_runs > n_runs_max) && break
    @everywhere GC.gc()
    @show n_runs
    @time f(n_runs, n_iter)
  end
end

main(1000)

Here are the timings for multiprocessing (the multithreaded code is unchanged from above):

revised_shared.jl timings
danielwe@mlhpc2:~/tmp$ julia --procs=25 revised_shared.jl
n_runs = 1
  4.351229 seconds (2.16 k allocations: 94.383 KiB)
n_runs = 2
  4.539514 seconds (2.24 k allocations: 97.930 KiB)
n_runs = 3
  4.602810 seconds (2.33 k allocations: 105.820 KiB)
n_runs = 4
  4.518517 seconds (2.42 k allocations: 110.492 KiB)
n_runs = 5
  4.663221 seconds (2.50 k allocations: 115.227 KiB)
n_runs = 10
  6.302061 seconds (2.94 k allocations: 137.305 KiB)
n_runs = 15
  6.564405 seconds (3.85 k allocations: 176.664 KiB)
n_runs = 20
  7.231386 seconds (3.80 k allocations: 186.539 KiB)
n_runs = 25
  8.795986 seconds (4.23 k allocations: 209.492 KiB)

These timings are almost identical to the multithreaded timings, accruing a slowdown of about 2x from 1 to 25 processes. Hence, SharedArrays worked for eliminating the performance drop-off due to outgrowing the CPU cache. It does not outperform multithreading, however, suggesting that the lack of perfect scaling at this point is not due to single-threaded GC. If you can get your actual code to a similar state where @time doesn’t report any GC time and multithreading performs as well as multiprocessing, there’s probably no reason to prefer multiprocessing.

I haven’t looked into how to get closer to perfect scaling from here, but my hunch is that the answer lies in thinking more about hardware resources and topology. Specifically, the machine I’m using has 4 CPUs with 10 physical cores each. Thus, if I’m using more than 10 threads or processes, levels is likely shared across multiple CPUs, so it can’t just live in a single CPU’s cache. I have no idea how hardware deals with this, but it sounds like it could be a bottleneck. The next thing I would try is to use ThreadPinning.jl to pin threads to cores and make as many copies of levels as there are CPUs, sharing each copy only between the threads pinned to the same CPU. Beyond that, I defer to people more knowledgeable than me.

There’s also the question of how much more you can optimize your serial code, depending on your actual problem, see @foobar_lv2’s post above.

Hope this helps!

5 Likes

Thermal throttling is a big one. If only one core/CPU is utilized, then you can clock much higher. If that’s the answer, then your scaling might already be perfect.

I’m running on an enterprise compute server. I haven’t checked, but I didn’t expect thermal throttling to be a concern on such hardware?

It’s not a concern so much as a very clever system optimization: One core is very busy, 39 are idle; so it is advantageous for firmware/kernel to reconfigure the system for this clearly single-threaded workload, making use of the extra thermal/energy headroom. Applies not just to crappy laptops with underpowered cooling, but also to bigger machines. That being said, no idea how your machine is set up.

This sucks for benchmarking / development, though. I once lucked into a weird laptop with engineering sample CPU and quite bespoke firmware. Not very performant, but the timings were absolutely razor sharp. Super small jitter, runs for 4.57 seconds, that’s 97.2 cycles per loop iteration, let’s crack open that agner fog and get counting. Very educational!

2 Likes

Thanks @danielwe and @foobar_lv2 this is really helpful. I can apply most of these ideas directly to my the real code and already have ~3 speedup just applying a couple of them. I also learned a lot of idiosyncratic Julia too :slight_smile:

1 Like