Parallelizing Monte Carlo - using buffers, avoiding global variables

Hi,

I am trying to parallelize a Monte Carlo simulation using Distributed and SharedArrays, where I essentially want to assign workers independent iterations of my simulation. My simulation setup boils down to a simple for loop, where, in each iteration, I

  • draw some data at random,
  • conduct some estimations, and
  • write the estimation output to an array.

Since each estimation involves a substantial amount of linear algebra and thus as to avoid unnecessary allocations, I have designed the original simulation with a lot of buffers. I.e. overly simplified:

function simple_serial_MC(reps::Int,n::Int,β::Vector,draw_data!,get_estimates!)
simul_output = fill(NaN,reps,β)
p = length(β)
## DEFINE SOME BUFFERS HERE ## 
X = Matrix{Float64}(undef,n,p)
y = Vector{Float64}(undef,n,p)
X_b = similar(X)
y_b = similar(y)
β_b = similar(β) ## AND SO ON ## 
## RUN SIMULATION ##
for i in reps
## DRAW SOME DATA ## 
Random.seed!(i)
draw_data!(y,X,β,y_b,X_b,β_b)
## DO SOME ESTIMATION ## 
get_estimates!(y,X,β,y_b,X_b,β_b)
## WRITE TO OUTPUT ## 
simul_output[i] = β
end 
return simul_output 
end 

While this is simple for the serial loop, I am struggling to generalize this for a parallelized loop with @ditributed.

I managed to declare the buffers for each process, and I can run each loop iteration using a function inner_loop!(i::Int, simul_output::SharedArray). However, this way, my buffers are global variables on the memory of each worker, which from my understanding, should be avoided for performance reasons. Simply using another function function inner_loop!(i::Int, simul_output::SharedArray, buff_one, buff_two...) also does not work as this looks for the buffers in the main process and copies them to the workers in each iteration, which I try to avoid for performance reasons:

  • Attached at the end is a MWE of a monte carlo simulation that repeatedly estimates parameters of a logistic reggresion model
  • logistic_monte_carlo_distr_1 declares all buffers on each worker and looks for these buffers in the global namespace of each work during a call of inner_loop!(i,outmat)
  • logistic_monte_carlo_distr_2 uses inner_loop!(i,outmat,args...) which copies all buffers to the worker at each iteration
  • Running the simulation with 1000 repititions:
using BenchmarkTools, Distributed
addprocs(4)
## INCLUDE SIMUL SCRIPT, WHICH IS GIVEN AT THE END OF THIS POST
n = 200; 
p = 10; 
β = randn(p); 
seed = 0; 
rng = MersenneTwister(123); 
B = 1000; 
@btime logistic_monte_carlo_distr_1($β,$n,$B,$seed,$rng); 
2.558 s (1622 allocations: 274.23 KiB)
@btime logistic_monte_carlo_distr_2($β,$n,$B,$seed,$rng,$draw_X!); 
2.910 s (1872 allocations: 522.78 KiB)

Hence, the setup where each variable is looked for in the global namespace is faster than copying all buffers to the worker in each iteration, which gives me hope that I could substantially speed up the simulation if I was able to call inner_loop!(i,outmat,args...) with args... from each worker. However, this I am unable to do.

It seems that what I want to achieve, namely avoid redundant allocations in each iteration of the simulation, is fairly intuitive and thus should have been done before. However I was not able to find how this is best achieved, which makes me wonder if there is something substantially wrong withmy approach, and how I would best parallelize my simulation. Thank you for your help.

Simul functions:

@everywhere using MKL, Random, Optim, NLsolve, LinearAlgebra, SharedArrays
### Monte Carlo Simulation MWE 

## Logistic regression MLE 
@everywhere function MLE_estimation_cascade!(β_MLE::Vector,β_2::Vector,β_1::Vector,β_0::Vector,β_b::Vector, X::Matrix, y::Vector, 
    μ::Vector,μ_b::Vector, μ_b2::Vector,μ_b3::Vector, X_b::Matrix; x_tol = 1e-08::Float64, f_tol = 1e-12::Float64, kwargs...) 
    conv = false 
    β_b .= NaN 
    b_val = Inf 
    val = Inf 
    for β ∈ (β_2,β_1,β_0)
        try
            conv, val = get_logistic_MLE3!(β_MLE,X,y,β,μ,μ_b,μ_b3,X_b; x_tol = x_tol, f_tol = f_tol, kwargs... )
            conv && continue
            if val < b_val
                β_b .= β_MLE 
                b_val = val
            end 
        catch 
        end
    end 
    return conv
end 

@everywhere function logistic_fgh!(F,G,H,β,X,y,μ,μ_b,μ_b2,X_b; ϵ = 0.0)
    mul!(μ_b,X,β)
    if G !== nothing
        μ .= 1.0 ./ (1.0 .+ exp.(.-μ_b))
        if ϵ == 0.0
            μ_b2 .= μ .- y 
        else 
            μ_b2 .= (1.0 + ϵ) .* μ .- (1.0 + ϵ/2) .* y
        end 
        mul!(G, X', μ_b2) 
    end 
    if H !== nothing 
        μ_b2 .= μ .* (1.0 .- μ) 
        colmult!(X_b,μ_b2,X)
        mul!(H,X',X_b)
        if ϵ != 0.0 
            H .= (1.0 + ϵ) .* H 
        end 
    end 
    if F !== nothing 
        μ_b2 .= y .* μ_b
        μ_b .= 1.0 .+ exp.(μ_b) 
        μ_b .= log.(μ_b)
        if !(ϵ == 0.0)
            μ_b2 .= (1.0 + ϵ/2) .* μ_b2
            μ_b .= (1.0 + ϵ) .* μ_b 
        end 
        μ_b .= μ_b .- μ_b2 
        return sum(μ_b)
    end 
    return nothing
end

@everywhere function colmult!(A::Matrix{Float64},v::Vector{Float64}, B::Matrix{Float64}) 
    @inbounds for j ∈ axes(A,2) 
        @inbounds @simd for i ∈ axes(A,1) ## Maybe loopvectorization turbo here 
            A[i,j] = v[i] * B[i,j]
        end
    end 
    A
end

@everywhere function get_logistic_MLE3!(β::Vector{Float64},X::Matrix{Float64},y::Vector{Float64},β_init::Vector{Float64},μ::Vector{Float64},μ_b::Vector{Float64},μ_b2::Vector{Float64},X_b::Matrix{Float64}; ϵ::Float64 = 0.0, method=Optim.Newton(linesearch = LineSearches.MoreThuente()), kwargs...) 
    opt = Optim.optimize(Optim.only_fgh!((F,G,H,β) -> logistic_fgh!(F,G,H,β,X,y,μ,μ_b,μ_b2,X_b; ϵ = ϵ)),
                        β_init, method = method; kwargs...)::Optim.MultivariateOptimizationResults
    β .= opt.minimizer::Vector{Float64}
    return Optim.converged(opt) && !any(isnan.(β)) , Optim.minimum(opt)::Float64 
end 

@everywhere function get_starting_values!(β_2::Vector,X::Matrix,y::Vector,β_0::Vector,
    μ::Vector,μ_b::Vector,X_b::Matrix, ϵ_0::Float64; x_tol = 1e-08, f_tol = 1e-12, kwargs...)

    conv = false 
    for i in 1:10 
        try
            conv, val = get_logistic_MLE3!(β_2,X,y,β_0,μ,μ_b,μ_b2,X_b; ϵ = i*ϵ_0, x_tol = x_tol, f_tol = f_tol, kwargs... )  
            conv && !any(isnan.(β_2)) && continue
        catch 
        end 
    end 
    if conv 
        return 
    else 
        β_2 .= 0.0 
        return 
    end 
end 

## Data generation
@everywhere function draw_X!(X,rng)
    X .= randn(rng, size(X))
    X[:,1] .= 1.0 
end 

@everywhere @inline function logistic_mean!(mus::Vector{Float64},X::Matrix{Float64},beta::Vector{Float64})  
    mul!(mus,X,beta,-1.0,false) 
    mus .= exp.(mus) 
    mus .= 1.0 ./ (1.0 .+ mus )
end 

@everywhere function draw_y!(y,X,β,μ,rng)
    logistic_mean!(μ,X,β)
    y .= rand(rng,length(y)) 
    y .= y .< μ 
end 

## Distributed Monte Carlo Simulation 
@everywhere function pass_to_workers(β,rng,n,p,seed)
    y = Vector{Float64}(undef,n)
    μ = Vector{Float64}(undef,n)  
    X = Matrix{Float64}(undef,n,p)

    μ_b = similar(μ)
    μ_b2 = similar(μ)
    μ_b3 = similar(μ)
    X_b = similar(X) 

    β_1 = zeros(p) 
    β_2 = similar(β_1)
    β_MLE = similar(β_1)
    β_b = similar(β_1)

    ϵ = n/p 

    return rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed
end 

function logistic_monte_carlo_distr_1(β,n,B,seed,rng)
    p = length(β) 
    outmat = SharedMatrix{Float64}(fill(NaN,B,p)) 
    @everywhere (rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed) = pass_to_workers($β,$rng,$n,$p,$seed)
    y = NaN # this works 
    @inbounds @sync @distributed for i ∈ 1:B
        inner_loop!(i,outmat)
    end
    return outmat
end 

function logistic_monte_carlo_distr_2(β,n,B,seed,rng,draw_X!)
    p = length(β) 
    outmat = SharedMatrix{Float64}(fill(NaN,B,p)) 
    @everywhere (rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed) = pass_to_workers($β,$rng,$n,$p,$seed)
    y = NaN ## this makes this setup fail 
    @inbounds @sync @distributed for i ∈ 1:B
        inner_loop!(i, outmat, draw_X!, draw_y!,rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed)
    end
    return outmat
end 

@everywhere function inner_loop!(i,outmat)
    Random.seed!(rng,i+seed)
    draw_X!(X,rng)
    draw_y!(y,X,β,μ,rng)

    for j ∈ 1:6 
        ## β_2 
        get_starting_values!(β_2,X,y,β,μ,μ_b,X_b,ϵ;
            method = Optim.Newton(linesearch = LineSearches.MoreThuente())) 

        ## β_MLE 
        MLE_estimation_cascade!(β_MLE,β_2,β_1,β,β_b,X,y,μ,μ_b,μ_b2,μ_b3,X_b;
            method = Optim.Newton(linesearch = LineSearches.MoreThuente())) 
    end 
    outmat[i,:] .= β_MLE 
end 

@everywhere function inner_loop!(i, outmat, draw_X!, draw_y!,rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed)
    Random.seed!(rng,i+seed)
    draw_X!(X,rng)
    draw_y!(y,X,β,μ,rng)

    for j ∈ 1:6 
        ## β_2 
        get_starting_values!(β_2,X,y,β,μ,μ_b,X_b,ϵ;
            method = Optim.Newton(linesearch = LineSearches.MoreThuente())) 

        ## β_MLE 
        MLE_estimation_cascade!(β_MLE,β_2,β_1,β,β_b,X,y,μ,μ_b,μ_b2,μ_b3,X_b;
            method = Optim.Newton(linesearch = LineSearches.MoreThuente())) 
    end 
    outmat[i,:] .= β_MLE 
end

Update:

I was able to define a local closure function on each worker process using the ParallelDataTransfer package as follows:

@everywhere using ParallelDataTransfer
for worker in workers()
    @defineat worker inner_loop_closure!(i,outmat) = inner_loop!(i, outmat, draw_X!, draw_y!,rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed)
end 

function logistic_monte_carlo_distr_3(β,n,B,seed,rng)
    p = length(β) 
    outmat = SharedMatrix{Float64}(fill(NaN,B,p)) 
    @everywhere (rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed) = pass_to_workers($β,$rng,$n,$p,$seed)
    @inbounds @sync @distributed for i ∈ 1:B
        inner_loop_closure!(i, outmat)
    end
    return outmat
end 

This gives the same result as the other designs, the speedup is not amaizing however:

@assert  logistic_monte_carlo_distr_3(β,n,B,seed,rng) ==  logistic_monte_carlo_distr_1(β,n,B,seed,rng)
@assert  logistic_monte_carlo_distr_3(β,n,B,seed,rng) == logistic_monte_carlo_distr_2(β,n,B,seed,rng,draw_X!)

@btime logistic_monte_carlo_distr_1($β,$n,$B,$seed,$rng);
2.470 s (1621 allocations: 274.20 KiB)
@btime logistic_monte_carlo_distr_2($β,$n,$B,$seed,$rng,$draw_X!);
2.868 s (1897 allocations: 526.52 KiB)
@btime logistic_monte_carlo_distr_3($β,$n,$B,$seed,$rng);
2.299 s (1614 allocations: 273.42 KiB)

Given the negligible speedup, does that mean that either approach is essentially fine, or am I still on the wrong path with this setup?

Any thoughts are much appreciated.

I didn’t go through the code, but if passing data between processes is slowing things down, why not try multithreading with a single process?

I tried multithreading as well. To make things threadsafe I had to reinitialize all buffers in each loop-iteration, so that would make this aspect equivalent to logistic_monte_carlo_distr_1. Or am I missing something here?

I ran both the multithreaded and distributed design on a HPC cluster on a single node with 4,8,16,32 and 48 cpus, and while multithreading does better for fewer cpus, there appears to be no improvement beyond 16cpus, while the distributed setup almost scales linearly.