Help with parallelizing indirect inference estimation

I’m trying my hand at estimation via indirect inference, and created this script to experiment. I am wondering what is the best way to parallelize the computation. I think the critical functions are simulate and the line that computes θ̃ inside the function obj. I don’t have experience in parallel computing, so I am open to any suggestions.

using Optim
using Statistics
using Distributions
using Random

Random.seed!(23578)

# true dgp
# y = exp(β ⋅ x) + u
# u ~ 𝒩(0, σ²)

function model(x, β)
    exp.(x * β)
end

# number of simulations
const M = 10

# size of data
const n = 1_000
const K = 5

# generate data
const σ² = 1.5
const β = rand(K)
const udgp = rand(Normal(0, σ²), n)
const x = hcat(fill(1.0, n), rand(Normal(), (n, K - 1)))
const ydgp = model(x, β) + udgp

# auxiliary model
# y = θ ⋅ x + ε
# ε ~ 𝒩(0, σε²)

# estimator of auxiliary model
function ols(x, y)
    xt = transpose(x)
    inv(xt * x) * xt * y
end

# generate simulations
const u0 = [randn(n) for i in 1:M]

function simulate(x, β; u0=u0, M=M)
    [model(x, β) + u0[i] for i in 1:M]
end

# estimate of auxiliary model
const θ̂ = ols(x, ydgp)

# criterion to be minimized
criterion(θ̃; θ̂=θ̂) = sum((θ̂ - θ̃).^2)

# indirect inference objective function
function obj(β)
    ys = simulate(x, β)
    θ̃ = mean([ols(x, ys[i]) for i in 1:M])
    return criterion(θ̃)
end

# initial values of parameters of true model
β0 = ones(K)

# optimization
opt = optimize(obj, β0, method=ConjugateGradient(), autodiff=:forward)

# indirect inference estimates (of the true model)
β̂ = Optim.minimizer(opt)

Hi!
I think as of now there are probably a few things to consider before going parallel immediately.
As far as I can tell, you’re right in that the function obj is likely the critical part, but it is necessary to really understand which part you want to do in parallel.
Do you want to speed up the computation that is in here, or do you want to compute several instances of this simulation in parallel? If the latter is the case, its probably easiest to use the pmap function from Distributed.
But actually, I think at this stage your code could likely gain much better benefits from removing memory allocations:

In this code for instance you create several new big matrices and vectors that you don’t have to.
Usually, for performance critical computations, it is recommended to allocate matrices and vectors before the computation and re-use it whenever possible by using function (indicated by an ! after their name) that write directly into them.
For instance, for multiplying matrices, you can use mul!(Res,A,B) from LinearAlgebra which uses the pre-allocated matrix Res to store the result. You admittedly lose a bit of the idiomatic nature of your code but I am sure it will be worth it.

You can check if your code allocates using BenchmarkTools.@btime. If performance of your code is still requiring you to parallelize, after reducing allocations significantly, then it might work to parallelize over this for loop using pmap

    θ̃ = mean([ols(x, ys[i]) for i in 1:M])
1 Like

This code looks pretty good to me. Nice work! For threads parallelism, you need for loops, and you don’t have them here because of the iterator approach. I have done indirect inference using both threads and iterators, and I found that the iterator approach leads to cleaner code, and ends up being just about as fast. I suppose that threading suffers from cache contention, when the model is complex enough to use some memory. I have yet to learn how to parallelize code that uses the iterator approach, so I will be paying attention if solutions are offered.

If you happen to run Monte Carlo simulations, the iterator approach is quite a bit superior, in my experience, because the natural way to parallelize is across the Monte Carlo replications.

The file Econometrics/EstimateSV.jl at main · mcreel/Econometrics · GitHub does method of simulated moments estimation, which is pretty much the same as indirect inference, at least in terms of parallelization opportunities. You can explore the effect of threads-bases parallelization by adding Threads.@threads before the for loops in the auxstat function in SVlib.jl. I find that going from no threads to 4 threads, I get about a 10% speedup, which, for me, is not worth it. I prefer the iterator style code.

This resource seems to have information about parallelizing iterator based code: A quick introduction to data parallelism in Julia

3 Likes

any thoughts on distributed vs threaded approach for parallel computing for this application?

If you just want to do a bunch of least squares, then may something like:

Threads.@threads for i in 1:n
    # solve your least squares
end

BUT before you do that, note you should NEVER do b = inv(X'X)X'y as you’ve written. For instance, compare the blackslash operator \ with your ols function

using BenchmarkTools
n = 1000
X = randn(n, n);
b = randn(n);
y = X * b + randn(n)

function ols(x, y)
    xt = transpose(x)
    inv(xt * x) * xt * y
end

#check answers
all(X\y .≈ ols(X,  y)) # returns true

@btime X\y         #5.895 ms (4 allocations: 7.64 MiB)
@btime ols($X, $y) #33.106 ms (10 allocations: 23.39 MiB)

The black slash \ is much faster because (1) you never need to explicitly invert x'x to do least squares (I think \ does QR or cholesky depending on structure of x) and (2) the QR or Cholesky \ calls is internally calling BLAS/LAPACK which is multithreaded.

It would help if you can let us know what exactly you are doing.

1 Like

Thank you, that was an easy example of the general computation I’m trying to parallelize. I don’t use ols in the real application, rather I maximize a likelihood.

Also in my real application, the function model is very expensive. It consists of a large matrix on which I do operations row by row, where each row represents parameters to a function that I need to optimize. This is done with a for loop.

Another bottleneck is the simulate function, where I need to call model around 10 times. I think those are the two main functions I can parallelize.

Aside, is there any reference for threads vs distributed, SharedArrays vs DistributedArrays vs the other libraries for parallel computation (FLoops, Folds and others in the reference provided by @mcreel)?

FYI, I wrote a quick note here: Frequently asked questions

3 Likes

I played around with this a bit. If I set M=1000, then I can get about a 30-40% speedup using 2 threads versus one. Using more threads does not help. My belief is that garbage collection interferes with performance using threads, as, from what I have read, any garbage collection will stop all threads. So, with too many threads, the probability of all of them getting stopped goes up. I have found in some experimentation that MPI gives good performance, see An embarrassingly parallel problem: threads or MPI?. To limit garbage collection when using threads, trying to avoid allocations is a good strategy, I believe.

The code I used for threading for this problem is

# indirect inference objective function
function iiobj(β, θ, x, u0, M)
    ys = simulate(x, β, u0, M)
    m = zeros(size(x,2))
    Threads.@threads for i in 1:M
    #for i in 1:M
    m .+= ols(x,ys[i])
    end
    m = m ./ M
    return sum((θ - m).^2) 
end

For optimization, I set the criterion using an anonymous function:

obj = β -> iiobj(β, θ, x, u0, M)  
1 Like

I did this

using Optim
using Distributions
using Random
using Folds

Random.seed!(23578)

# true dgp
# y = exp(β ⋅ x) + u
# u ~ 𝒩(0, σ²)

function model(x, β)
    exp.(x * β)
end

# number of simulations
const M = 1_000

# size of data
const n = 10_000
const K = 5

# generate data
const σ² = 1.5
const β = rand(K)
const udgp = rand(Normal(0, σ²), n)
const x = hcat(fill(1.0, n), rand(Normal(), (n, K - 1)))
const ydgp = model(x, β) + udgp

# auxiliary model
# y = θ ⋅ x + ε
# ε ~ 𝒩(0, σε²)

# estimator of auxiliary model
function ols(x, y)
    x \ y
end

# generate simulations
const u0 = [randn(n) for i in 1:M]

function simulate(x, β; u0=u0, M=M, env=Folds)
    env.collect(model(x, β) + u0[i] for i in 1:M)
end

# estimate of auxiliary model
const θ̂ = ols(x, ydgp)

# criterion to be minimized
criterion(θ̃; θ̂=θ̂) = sum((θ̂ - θ̃).^2)

# indirect inference objective function
function obj(β; u0=u0, M=M, env=Folds)
    ys = simulate(x, β; u0=u0, M=M, env=env)
    θ̃ = env.sum(ols(x, ys[i]) for i in 1:M)/M
    return criterion(θ̃)
end

# initial values of parameters of true model
β0 = ones(K)

# optimization
opt = optimize(obj, β0, method=ConjugateGradient(), autodiff=:forward)

# indirect inference estimates (of the true model)
β̂ = Optim.minimizer(opt)

I am multi-threading in both the objective function obj and the simulation simulate. I will try to play turning on and off each of them in turn and see what’s faster. Aside, using \ for ols is not always faster, but I don’t care much about that because I actually use maximum likelihood to estimate the auxiliary model.

1 Like

One of the coolest things about Julia is that package authors participate in the Discourse community :slight_smile:
Is there a way to add threads to Julia after it is started?

How can I use FLoops to parallelize this:

const x = rand(10)
const y = rand(10)

function plus(x, y)
    x + y
end

function f(; x=x, y=y)
    ret = similar(x)
    for i in 1:size(x, 1)
        ret[i] = plus(x[i], y[i])
    end
    return ret
end

I did this:

function f1(; x=x, y=y)
    ret = similar(x)
    @floop for i in 1:size(x, 1)
        ret[i] = plus(x[i], y[i])
    end
    return ret
end

but it doesn’t speed things up.

m .+= ... causes a data race. Every thread tries to update every element in m.

If you need a parallel sum, checkout Folds.sum or FLoops.@reduce.

This is not possible at the moment. It’d require a rather large surgery to the Julia runtime.

You’d need @floop ThreadedEx() for i in 1:size(x, 1). Similar Q/A Problem with `@reduce` of `FLoops`; data race? - #2 by tkf

1 Like

I’m using @floop ThreadedEx() for without @reduce and it takes longer than the serial version.

What’s the size of x and y? For a simple function like this, you’d probably need something like 2^20 to 2^30.

I understand, but even in my real application, where the function inside the loop is complicated, it takes longer with @floop ThreadedEx() for than with just for. I will try to write a minimum example for illustration.

I tried to write a mwe that illustrates my issue, but I can’t mimic the issue in my application. Any ways, let me then just ask about speeding up something like this:

using Folds
using FLoops
using Optim
using BenchmarkTools

const n = 10_000

const matkk = 1000 .* rand(n, 2)

function rosenbrock(xx; pp=matkk)
    a = pp[1]
    b = pp[2]
    x = xx[1]
    y = xx[2]
    (a - x)^2 + b*(y - x^2)^2
end

function f1(; matkk=matkk)
    tmat = similar(matkk)
    @floop ThreadedEx() for i in 1:size(tmat, 1)
        ki = matkk[i, :]
        sol = optimize(xx -> rosenbrock(xx; pp=ki), [10.0, 10.5])
        tmat[i, :] = Optim.minimizer(sol)
    end
    return tmat
end

I see some speedup with your example. First, I modified your example a bit so that it’s easy to compare with/without threading:

function f1(; matkk=matkk, ex=ThreadedEx())
    tmat = similar(matkk)
    @floop ex for i in 1:size(tmat, 1)
        ki = matkk[i, :]
        sol = optimize(xx -> rosenbrock(xx; pp=ki), [10.0, 10.5])
        tmat[i, :] = Optim.minimizer(sol)
    end
    return tmat
end

Then, in a session started with julia -t8, I get:

julia> Threads.nthreads()
8

julia> @benchmark f1(ex = SequentialEx())
BenchmarkTools.Trial: 2 samples with 1 evaluation.
 Range (min … max):  3.684 s …   3.698 s  ┊ GC (min … max): 0.75% … 0.67%
 Time  (median):     3.691 s              ┊ GC (median):    0.71%
 Time  (mean ± σ):   3.691 s ± 10.342 ms  ┊ GC (mean ± σ):  0.71% ± 0.06%

  █                                                       █
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  3.68 s         Histogram: frequency by time         3.7 s <

 Memory estimate: 684.64 MiB, allocs estimate: 41898594.

julia> @benchmark f1(ex = ThreadedEx())
BenchmarkTools.Trial: 6 samples with 1 evaluation.
 Range (min … max):  702.056 ms … 876.160 ms  ┊ GC (min … max): 25.92% … 42.56%
 Time  (median):     867.414 ms               ┊ GC (median):    42.16%
 Time  (mean ± σ):   841.881 ms ±  68.669 ms  ┊ GC (mean ± σ):  39.88% ±  6.63%

  ▁                                                       ▁▁▁ █
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁███▁█ ▁
  702 ms           Histogram: frequency by time          876 ms <

 Memory estimate: 684.64 MiB, allocs estimate: 41898645.
1 Like