Trying to understand parallel performance in Turing

Hi!

I’ve been learning Turing for the last couple of weeks and while trying to make the most out of the 8 cores in my computer, I realized performance was not scaling as I expected when sampling multiple chains with multiple cores. So I setup the turing coinflip example from the turing intro to run in a single thread, multi-threaded and also distributed. Did the runs using 6 cores just to be sure nothing else in my system is providing a bottleneck. For the single and multi-threaded cases I used a common setup for the model and sampler parameters:

# Using Base modules.
using Random

# Load Turing
using Turing

# Load the distributions library.
using Distributions

#Create model for a set of coin flips
@model function coinflip(y)
    # Our prior belief about the probability of heads in a coin.
    p ~ Beta(1, 1)

    # The number of observations.
    N = length(y)
    for n in 1:N
        # Heads or tails of a coin are drawn from a Bernoulli distribution.
        y[n] ~ Bernoulli(p)
    end
end;

#Create artificial data for coin flips
# Iterate from having seen 0 observations to 100 observations.
Ns = 0:100;
# Set the true probability of heads in a coin.
p_true = 0.5
# Draw data from a Bernoulli distribution, i.e. draw heads or tails.
Random.seed!(12)
data = rand(Bernoulli(p_true), last(Ns));

# Settings of the Hamiltonian Monte Carlo (HMC) sampler.
iterations = 5000000
ϵ = 0.05
τ = 10

and then sampled with one thread (used @time rather than @benchmark but repeating a couple of times numbers were consistent)

#SINGLE THREAD EXAMPLE
function perform_sampling_solo(ϵ,τ,iterations)
    chains = sample(coinflip(data), HMC(ϵ, τ), iterations);
    return chains
end
# Start sampling.
@time chains = mapreduce(c -> perform_sampling_solo(ϵ,τ,iterations), chainscat, 1:6)

and sampled with multiple threads (used JULIA_NUM_THREADS=6)

#MULTI-THREADED EXAMPLE
function perform_sampling(ϵ,τ,iterations)
    chains = sample(coinflip(data), HMC(ϵ, τ), MCMCThreads(), iterations, 6);
    return chains
end
# Start sampling
@time perform_sampling(ϵ,τ,iterations)

To perform the distributed test I did the following

#DISTRIBUTED EXAMPLE
# Load Distributed to add processes and the @everywhere macro.
using Distributed

addprocs(6)

@everywhere begin
   # Using Base modules.
   using Random

   # Load Turing
   using Turing

   # Load the distributions library.
   using Distributions

   using BenchmarkTools

   #Create artificial data for coin flips
   # Iterate from having seen 0 observations to 100 observations.
   Ns = 0:100;
   # Set the true probability of heads in a coin.
   p_true = 0.5
   # Draw data from a Bernoulli distribution, i.e. draw heads or tails.
   Random.seed!(12)
   data = rand(Bernoulli(p_true), last(Ns));

   # Settings of the Hamiltonian Monte Carlo (HMC) sampler.
   iterations = 5000000
   ϵ = 0.05
   τ = 10
end

#Create model for a set of coin flips
@everywhere @model function coinflip(y)
    # Our prior belief about the probability of heads in a coin.
    p ~ Beta(1, 1)

    # The number of observations.
    N = length(y)
    for n in 1:N
        # Heads or tails of a coin are drawn from a Bernoulli distribution.
        y[n] ~ Bernoulli(p)
    end
end;

function perform_sampling(ϵ,τ,iterations)
    chains = sample(coinflip(data), HMC(ϵ, τ), MCMCDistributed(), iterations, 6);
    return chains
end

# Start sampling
@time perform_sampling(ϵ,τ,iterations)

Additional info, I’m using julia 1.7.1 with the following packages in my environment:

  [6e4b80f9] BenchmarkTools v0.7.0
  [31c24e10] Distributions v0.24.18
  [7073ff75] IJulia v1.23.2
  [c7f686f2] MCMCChains v4.14.1
  [c3e4b0f8] Pluto v0.17.5
  [f3b207a7] StatsPlots v0.14.30
  [fce5fe82] Turing v0.15.1

So the results I get from these tests are:

  • single thread = 1590 seconds
  • multi threaded = 1130 seconds
  • distributed = 360 seconds

During both the multi-threaded and the distributed runs I see 6 of my cpu cores running at almost 100%. But still there is a very significant difference in performance. From the single threaded run I get just a 30% increase in performance by using 6 cores with multi-threading. The distributed run performs far better, with a ~440% boost in performance.

Am I missing something fundamental in the implementation of the multi-threaded case here?

Turing doesn’t work on 1.7 yet, see:

https://github.com/TuringLang/Turing.jl/issues/1738

for that reason your Turing version is very old - 0.15 when the current release is 0.19. Not saying this explains what’s happening, but it would probably more useful to check performance for the current release.

Certainly better to do this with an up-to-date version of Turing. Just re-ran the tests using julia 1.6.5 and my environment has

  [31c24e10] Distributions v0.25.40
  [fce5fe82] Turing v0.19.4

what I find with the above tests is (showing full output from @time):

  • single threaded:
    1676.549851 seconds (10.56 G allocations: 713.028 GiB, 54.02% gc time, 0.03% compilation time)
  • multi threaded (6 threads, same as in the original post):
    1478.391515 seconds (10.53 G allocations: 712.670 GiB, 79.76% gc time, 0.17% compilation time
  • distributed (6 processes, same as in the original post):
    527.737194 seconds (46.49 M allocations: 12.626 GiB, 0.84% gc time, 0.61% compilation time)

So results are qualitative similar to what I had with the older Turing version. I guess the gc time on the distributed run is not really a meaningful report, but I’m surprised by the large gc usage in the multi-threaded case (~80%). I’m relatively new to julia though, so not sure what I should normally expect.

And a tangential issue, but is there a way to get more detailed progress reports from parallel runs of Turing? Currently I get a progress bar that only jumps as chains are completed, so it is not the most informative.

I am cross posting my response from Slack:

Hi Pablo,The problem you are running into is precompilation. When you run a julia function for the first time in a new session, there is a compilation cost and that is contaminating your benchmark.When benchmarking a long running function like sample, it is OK to use @ time but you need to run both functions twice. If you are benchmarking fast functions, @btime from BenchmarkTools.jl is recommended. After running each function once, I get the following results on my computer (with 2000 samples)

single thread 0.336741 seconds (2.72 M allocations: 159.888 MiB, 12.41% gc time, 8.44% compilation time)

multi-thread 0.080806 seconds (2.61 M allocations: 152.409 MiB)

The expected speed up was observed.

Also cross posting from slack.

Hi Christopher,

Thanks for the feedback. However, don’t think that is the source of the problem. Precompilation is relevant when doing a small number of iterations, so just as a brute force way I was doing my tests with a large number. To make things a bit clearer I did a more thorough testing, using @benchmark with 30 samples, a varying number of iterations, and 5 different types of sampling:

  • single threaded sampling of 6 chains in a row
  • multi threaded sampling using MCMCThreaded
  • multi threaded sampling by manually setting up multiple threads
  • distributed sampling using MCMCDistributed
  • Distributed sampling by manually setting up the processes

For each of the benchmarks I took the median times. I then plotted the speedup obtained for each of the parallel benchmarks by taking the ratio of the single threaded versus the parallel run. The code to do this is attached (takes about an hour to run for me as it does lots of tests). All of this uses 6 threads or processes in an 8 core machine. The plot that results from running this is below.

parallel_comparison

My naive expectation would be that as the number of iterations is increased (while not running out of memory), all serial parts of the calculation should represent a smaller fraction of the total, and the speedup would be higher. The reduction into a single chain would be the single operation that would scale with number of samples. However, for multi-threaded runs I see the opposite, performance is consistently lowered.

Distributed runs instead show a consistent performance increase against the single-threaded runs, although the speedup I observe is around ~3.5, while I’d naively expect it to be closer to 6 when using 6 cores. However, that might just be a consequence of the system boosting the single-threaded run to a higher clock frequency.

Below I include the entire code (a bit long and hacky) to perform the tests and make the plot above. Ran this as a script, not sure if it works inside a notebook.

using Random
using Turing
using Distributions

#Create model for a set of coin flips
@model function coinflip(y)
    # Our prior belief about the probability of heads in a coin.
    p ~ Beta(1, 1)
    # The number of observations.
    N = length(y)
    for n in 1:N
        # Heads or tails of a coin are drawn from a Bernoulli distribution.
        y[n] ~ Bernoulli(p)
    end
end;

#Create artificial data for coin flips
# Iterate from having seen 0 observations to 100 observations.
Ns = 0:100;
# Set the true probability of heads in a coin.
p_true = 0.5
# Draw data from a Bernoulli distribution, i.e. draw heads or tails.
Random.seed!(12)
data = rand(Bernoulli(p_true), last(Ns));

# Settings of the Hamiltonian Monte Carlo (HMC) sampler.
ϵ = 0.05
τ = 10
model=coinflip(data)

#MULTI_THREADED
function perform_sampling_threaded(ϵ,τ,iterations,model)
    chains = sample(model, HMC(ϵ, τ), MCMCThreads(), iterations, 6, progress=false);
    return chains
end

#MULTI_THREADED ("manually")
function perform_sampling_threaded2(ϵ,τ,iterations,model)
    chains = Vector{Chains}(undef, 6)
    Threads.@threads for i = 1:6
        chains[Threads.threadid()] = sample(model, HMC(ϵ, τ), iterations, progress=false)
    end
    cat_chains = chainscat(chains...)
end

#SINGLE THREADED
function perform_sampling_solo(ϵ,τ,iterations,model)
    chains = mapreduce(c -> sample(model, HMC(ϵ, τ), iterations, progress=false), chainscat, 1:6)
    return chains
end

using BenchmarkTools
iter_nums = [100,300,1_000,3_000,10_000,30_000,100_000,300_000,1_000_000]
iterations = 0
times_solo = Vector{Float64}(undef, length(iter_nums))
for (i, iter_num) in enumerate(iter_nums)
    global iterations = iter_num
    result = @benchmark perform_sampling_solo(ϵ,τ,iterations,model) samples=30
    times_solo[i] = median(result.times/1e9)
end
times_threaded = Vector{Float64}(undef, length(iter_nums))
for (i, iter_num) in enumerate(iter_nums)
    global iterations = iter_num
    result = @benchmark perform_sampling_threaded(ϵ,τ,iterations,model) samples=30
    times_threaded[i] = median(result.times/1e9)
end
times_threaded2 = Vector{Float64}(undef, length(iter_nums))
for (i, iter_num) in enumerate(iter_nums)
    global iterations = iter_num
    result = @benchmark perform_sampling_threaded2(ϵ,τ,iterations,model) samples=30
    times_threaded2[i] = median(result.times/1e9)
end

#do distributed runs
using Distributed
addprocs(6)
@everywhere begin
   using Pkg
   Pkg.activate(".")
   Pkg.instantiate()

   using Turing
   using Random
   using Distributions

   #Create artificial data for coin flips
   # Iterate from having seen 0 observations to 100 observations.
   Ns = 0:100;
   # Set the true probability of heads in a coin.
   p_true = 0.5
   # Draw data from a Bernoulli distribution, i.e. draw heads or tails.
   Random.seed!(12)
   data = rand(Bernoulli(p_true), last(Ns));

   # Settings of the Hamiltonian Monte Carlo (HMC) sampler.
   ϵ = 0.05
   τ = 10
end
@everywhere @model function coinflip(y)
    # Our prior belief about the probability of heads in a coin.
    p ~ Beta(1, 1)

    # The number of observations.
    N = length(y)
    for n in 1:N
        # Heads or tails of a coin are drawn from a Bernoulli distribution.
        y[n] ~ Bernoulli(p)
    end
end;
@everywhere model=coinflip(data)
function perform_sampling_distributed(ϵ,τ,iterations,model)
    chains = sample(model, HMC(ϵ, τ), MCMCDistributed(), iterations, 6, progress=false);
    return chains
end
#call once for pre-compilation
@time perform_sampling_distributed(ϵ,τ,10,model)

#perform sampling
times_distributed = Vector{Float64}(undef, length(iter_nums))
for (i, iter_num) in enumerate(iter_nums)
    global iterations = iter_num
    result = @benchmark perform_sampling_distributed(ϵ,τ,iterations,model) samples=30
    times_distributed[i] = median(result.times/1e9)
end

#Perform distributed sampling manually
function perform_sampling_distributed2(iterations)
    chains = @distributed (chainscat) for i = 1:6
        chain = sample(model, HMC(ϵ, τ), iterations, progress=false);
    end
    return chains
end
times_distributed2 = Vector{Float64}(undef, length(iter_nums))
for (i, iter_num) in enumerate(iter_nums)
    global iterations = iter_num
    result = @benchmark perform_sampling_distributed2(iterations) samples=30
    times_distributed2[i] = median(result.times/1e9)
end

#Plot speedups
using Plots
plot(iter_nums,times_solo./times_threaded, xaxis=:log, label="MCMCThreaded")
plot!(iter_nums,times_solo./times_threaded2, xaxis=:log, label="Manually threaded")
plot!(iter_nums,times_solo./times_distributed, xaxis=:log, label="MCMCDistributed")
plot!(iter_nums,times_solo./times_distributed2, xaxis=:log, label="Manually Distributed")
xlabel!("Iterations")
ylabel!("Speedup")
savefig("parallel_comparison.pdf")
2 Likes

Since more than 50% of the time is spent in GC, one possible reason for the performance difference between Threads and Distributed could be that Distributed allows GC to run in parallel while GC in any thread stops all threads. The memory allocations could also be a limiting factor in the parallelization speedup. This should not be a problem in a bigger model where more time is spent in computation instead of GC.

1 Like

@mikkoku , indeed it seems the GC time is a key thing in here. I repeated the tests above and plotted the fraction of time spent doing garbage collection. This lines up nicely with the speedup plot, GC time is nonexistent below 1000 samples, it plateaus at ~35% between 10^3.5-10^5, and then rises beyond 50% with a million iterations
parallel_comparison2
guess this means one should really opt for distributed runs when doing sampling with lots of iterations. Not sure if there is any way to regulate the frequency of GC to tune its performance hit.

2 Likes