Hi,
I am trying to parallelize a Monte Carlo simulation using Distributed and SharedArrays
, where I essentially want to assign workers independent iterations of my simulation. My simulation setup boils down to a simple for loop, where, in each iteration, I
- draw some data at random,
- conduct some estimations, and
- write the estimation output to an array.
Since each estimation involves a substantial amount of linear algebra and thus as to avoid unnecessary allocations, I have designed the original simulation with a lot of buffers. I.e. overly simplified:
function simple_serial_MC(reps::Int,n::Int,β::Vector,draw_data!,get_estimates!)
simul_output = fill(NaN,reps,β)
p = length(β)
## DEFINE SOME BUFFERS HERE ##
X = Matrix{Float64}(undef,n,p)
y = Vector{Float64}(undef,n,p)
X_b = similar(X)
y_b = similar(y)
β_b = similar(β) ## AND SO ON ##
## RUN SIMULATION ##
for i in reps
## DRAW SOME DATA ##
Random.seed!(i)
draw_data!(y,X,β,y_b,X_b,β_b)
## DO SOME ESTIMATION ##
get_estimates!(y,X,β,y_b,X_b,β_b)
## WRITE TO OUTPUT ##
simul_output[i] = β
end
return simul_output
end
While this is simple for the serial loop, I am struggling to generalize this for a parallelized loop with @ditributed
.
I managed to declare the buffers for each process, and I can run each loop iteration using a function inner_loop!(i::Int, simul_output::SharedArray)
. However, this way, my buffers are global variables on the memory of each worker, which from my understanding, should be avoided for performance reasons. Simply using another function function inner_loop!(i::Int, simul_output::SharedArray, buff_one, buff_two...)
also does not work as this looks for the buffers in the main process and copies them to the workers in each iteration, which I try to avoid for performance reasons:
- Attached at the end is a MWE of a monte carlo simulation that repeatedly estimates parameters of a logistic reggresion model
-
logistic_monte_carlo_distr_1
declares all buffers on each worker and looks for these buffers in the global namespace of each work during a call ofinner_loop!(i,outmat)
-
logistic_monte_carlo_distr_2
usesinner_loop!(i,outmat,args...)
which copies all buffers to the worker at each iteration - Running the simulation with 1000 repititions:
using BenchmarkTools, Distributed
addprocs(4)
## INCLUDE SIMUL SCRIPT, WHICH IS GIVEN AT THE END OF THIS POST
n = 200;
p = 10;
β = randn(p);
seed = 0;
rng = MersenneTwister(123);
B = 1000;
@btime logistic_monte_carlo_distr_1($β,$n,$B,$seed,$rng);
2.558 s (1622 allocations: 274.23 KiB)
@btime logistic_monte_carlo_distr_2($β,$n,$B,$seed,$rng,$draw_X!);
2.910 s (1872 allocations: 522.78 KiB)
Hence, the setup where each variable is looked for in the global namespace is faster than copying all buffers to the worker in each iteration, which gives me hope that I could substantially speed up the simulation if I was able to call inner_loop!(i,outmat,args...)
with args...
from each worker. However, this I am unable to do.
It seems that what I want to achieve, namely avoid redundant allocations in each iteration of the simulation, is fairly intuitive and thus should have been done before. However I was not able to find how this is best achieved, which makes me wonder if there is something substantially wrong withmy approach, and how I would best parallelize my simulation. Thank you for your help.
Simul functions:
@everywhere using MKL, Random, Optim, NLsolve, LinearAlgebra, SharedArrays
### Monte Carlo Simulation MWE
## Logistic regression MLE
@everywhere function MLE_estimation_cascade!(β_MLE::Vector,β_2::Vector,β_1::Vector,β_0::Vector,β_b::Vector, X::Matrix, y::Vector,
μ::Vector,μ_b::Vector, μ_b2::Vector,μ_b3::Vector, X_b::Matrix; x_tol = 1e-08::Float64, f_tol = 1e-12::Float64, kwargs...)
conv = false
β_b .= NaN
b_val = Inf
val = Inf
for β ∈ (β_2,β_1,β_0)
try
conv, val = get_logistic_MLE3!(β_MLE,X,y,β,μ,μ_b,μ_b3,X_b; x_tol = x_tol, f_tol = f_tol, kwargs... )
conv && continue
if val < b_val
β_b .= β_MLE
b_val = val
end
catch
end
end
return conv
end
@everywhere function logistic_fgh!(F,G,H,β,X,y,μ,μ_b,μ_b2,X_b; ϵ = 0.0)
mul!(μ_b,X,β)
if G !== nothing
μ .= 1.0 ./ (1.0 .+ exp.(.-μ_b))
if ϵ == 0.0
μ_b2 .= μ .- y
else
μ_b2 .= (1.0 + ϵ) .* μ .- (1.0 + ϵ/2) .* y
end
mul!(G, X', μ_b2)
end
if H !== nothing
μ_b2 .= μ .* (1.0 .- μ)
colmult!(X_b,μ_b2,X)
mul!(H,X',X_b)
if ϵ != 0.0
H .= (1.0 + ϵ) .* H
end
end
if F !== nothing
μ_b2 .= y .* μ_b
μ_b .= 1.0 .+ exp.(μ_b)
μ_b .= log.(μ_b)
if !(ϵ == 0.0)
μ_b2 .= (1.0 + ϵ/2) .* μ_b2
μ_b .= (1.0 + ϵ) .* μ_b
end
μ_b .= μ_b .- μ_b2
return sum(μ_b)
end
return nothing
end
@everywhere function colmult!(A::Matrix{Float64},v::Vector{Float64}, B::Matrix{Float64})
@inbounds for j ∈ axes(A,2)
@inbounds @simd for i ∈ axes(A,1) ## Maybe loopvectorization turbo here
A[i,j] = v[i] * B[i,j]
end
end
A
end
@everywhere function get_logistic_MLE3!(β::Vector{Float64},X::Matrix{Float64},y::Vector{Float64},β_init::Vector{Float64},μ::Vector{Float64},μ_b::Vector{Float64},μ_b2::Vector{Float64},X_b::Matrix{Float64}; ϵ::Float64 = 0.0, method=Optim.Newton(linesearch = LineSearches.MoreThuente()), kwargs...)
opt = Optim.optimize(Optim.only_fgh!((F,G,H,β) -> logistic_fgh!(F,G,H,β,X,y,μ,μ_b,μ_b2,X_b; ϵ = ϵ)),
β_init, method = method; kwargs...)::Optim.MultivariateOptimizationResults
β .= opt.minimizer::Vector{Float64}
return Optim.converged(opt) && !any(isnan.(β)) , Optim.minimum(opt)::Float64
end
@everywhere function get_starting_values!(β_2::Vector,X::Matrix,y::Vector,β_0::Vector,
μ::Vector,μ_b::Vector,X_b::Matrix, ϵ_0::Float64; x_tol = 1e-08, f_tol = 1e-12, kwargs...)
conv = false
for i in 1:10
try
conv, val = get_logistic_MLE3!(β_2,X,y,β_0,μ,μ_b,μ_b2,X_b; ϵ = i*ϵ_0, x_tol = x_tol, f_tol = f_tol, kwargs... )
conv && !any(isnan.(β_2)) && continue
catch
end
end
if conv
return
else
β_2 .= 0.0
return
end
end
## Data generation
@everywhere function draw_X!(X,rng)
X .= randn(rng, size(X))
X[:,1] .= 1.0
end
@everywhere @inline function logistic_mean!(mus::Vector{Float64},X::Matrix{Float64},beta::Vector{Float64})
mul!(mus,X,beta,-1.0,false)
mus .= exp.(mus)
mus .= 1.0 ./ (1.0 .+ mus )
end
@everywhere function draw_y!(y,X,β,μ,rng)
logistic_mean!(μ,X,β)
y .= rand(rng,length(y))
y .= y .< μ
end
## Distributed Monte Carlo Simulation
@everywhere function pass_to_workers(β,rng,n,p,seed)
y = Vector{Float64}(undef,n)
μ = Vector{Float64}(undef,n)
X = Matrix{Float64}(undef,n,p)
μ_b = similar(μ)
μ_b2 = similar(μ)
μ_b3 = similar(μ)
X_b = similar(X)
β_1 = zeros(p)
β_2 = similar(β_1)
β_MLE = similar(β_1)
β_b = similar(β_1)
ϵ = n/p
return rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed
end
function logistic_monte_carlo_distr_1(β,n,B,seed,rng)
p = length(β)
outmat = SharedMatrix{Float64}(fill(NaN,B,p))
@everywhere (rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed) = pass_to_workers($β,$rng,$n,$p,$seed)
y = NaN # this works
@inbounds @sync @distributed for i ∈ 1:B
inner_loop!(i,outmat)
end
return outmat
end
function logistic_monte_carlo_distr_2(β,n,B,seed,rng,draw_X!)
p = length(β)
outmat = SharedMatrix{Float64}(fill(NaN,B,p))
@everywhere (rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed) = pass_to_workers($β,$rng,$n,$p,$seed)
y = NaN ## this makes this setup fail
@inbounds @sync @distributed for i ∈ 1:B
inner_loop!(i, outmat, draw_X!, draw_y!,rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed)
end
return outmat
end
@everywhere function inner_loop!(i,outmat)
Random.seed!(rng,i+seed)
draw_X!(X,rng)
draw_y!(y,X,β,μ,rng)
for j ∈ 1:6
## β_2
get_starting_values!(β_2,X,y,β,μ,μ_b,X_b,ϵ;
method = Optim.Newton(linesearch = LineSearches.MoreThuente()))
## β_MLE
MLE_estimation_cascade!(β_MLE,β_2,β_1,β,β_b,X,y,μ,μ_b,μ_b2,μ_b3,X_b;
method = Optim.Newton(linesearch = LineSearches.MoreThuente()))
end
outmat[i,:] .= β_MLE
end
@everywhere function inner_loop!(i, outmat, draw_X!, draw_y!,rng,y,μ,X,μ_b,μ_b2,μ_b3,X_b,β,β_1,β_2,β_MLE,β_b,ϵ,n,p,seed)
Random.seed!(rng,i+seed)
draw_X!(X,rng)
draw_y!(y,X,β,μ,rng)
for j ∈ 1:6
## β_2
get_starting_values!(β_2,X,y,β,μ,μ_b,X_b,ϵ;
method = Optim.Newton(linesearch = LineSearches.MoreThuente()))
## β_MLE
MLE_estimation_cascade!(β_MLE,β_2,β_1,β,β_b,X,y,μ,μ_b,μ_b2,μ_b3,X_b;
method = Optim.Newton(linesearch = LineSearches.MoreThuente()))
end
outmat[i,:] .= β_MLE
end