How to preallocate for a parallel monte carlo simulation?

I am trying to run a series of monte carlo simulations. My simulation gives as an output a matrix. I would like to preallocate this matrix for each of the simulations, and also preallocate some work vectors since I am running a series of these simulations.

I’m not too familiar with parallel computing, and looking into it I find myself to be confused about what is the best practise to do this. Initially I thought about using the thread ID to do this, but after some reading it seems to be bad practise(?), and instead I should use channels. But I find it hard to catch the idea behind channels.

Anyways, here is “pseudo code” for my problem, just to illustrate what I’m trying to do.

#ILLUSTRATION

n_simulations = 1000
out = [zeros(2, 2) for _ in 1:n_simulations]
work_vector = zeros(2)

for p in parameters
    # here I could do f() = monte_carlo_simulation(p), but due to the preallocation it is more clear not to
    for i in 1:n_simulations
        monte_carlo_simulation!(p, work_vector, out[i]) # in-place changes to out[i]
    end
    save_data(out)
end

And what I think I want to do is something like this

#ILLUSTRATION

n_simulations = 1000
out = [zeros(2, 2) for _ in 1:n_simulations]
work_vector = [zeros(2) for _ in 1:Threads.nthreads()]

for p in parameters
    Threads.@threads for i in 1:n_simulations
        monte_carlo_simulation!(p, work_vector[Threads.myid()], out[i]) # in-place changes to out[i]
    end
    save_data(out)
end

But I understood that somehow the thread ID can change, and this might not work.

With channels I came up with the idea of storing the work vectors and the preallocated out matrices in channels, and for each simulation taking them out from the channel, using them, and putting them back.

#ILLUSTRATION

n_simulations = 1000
out = Channel{Matrix}(n_simulations)
for _ in 1:n_simulations
    put!(out, zeros(2, 2))
end
work_vectors = Channel{Vector}(Threads.nthreads())
for _ in 1:Threads.nthreads()
   put!(work_vectors, zeros(2))
end

for p in parameters
    @sync for i in 1:n_simulations
        Threads.@spawn begin
            current_out = take!(out)
            work_vector = take!(work_vectors)
            monte_carlo_simulation!(p, work_vector, current_out) # in-place changes to out[i]
            put!(out, current_out)
            put!(work_vectors, work_vector)
        end
    end
    save_data(out)
end

It this how it is supposed to be done? I feel stupid for taking and putting them back in the same channel. Instead of having out as a Channel, should I have it as a list and use out[i], and only use channels with the work vectors? I would be very glad if someone could give me some direction with this, I don’t feel confident with this at all.

4 Likes

Take a look at this example:

You don’t necessarily need to use the package, but the pattern for chunking is shown there. The docs of the package also discuss other patterns.

Ps: if you will store independent matrices for each simulation and you can afford having a copy of all the data for each simulation, you can just use @threads in front of the loop over the simulations. If, however, you want to create only of the order of nthreads copies of the auxiliary buffers, you need to split the workload in chunks, as done there.

Using the package above, your code could be something like:

using ChunkSplitters
using Base.Threads
n_simulations = 1000
n_chunnks = Threads.nthreads() # can vary
out = [zeros(2, 2) for _ in 1:n_simulations]
work_vector = [zeros(2) for _ in 1:nchunks]
for p in parameters
    @threads for (i_range, i_chunk) in chunks(1:n_simulations, nchunks)
        for i in i_range
            monte_carlo_simulation!(p, work_vector[i_chunk], out[i]) # in-place changes to out[i]
        end
    end
    save_data(out)
end

This pattern can have (the maybe unimportant) false-sharing issue mentioned below (but that’s a performance issue, not a correctness one, and may be irrelevant for you).

Alternatively you could create the output matrix for each simulation inside the for i in i_range loop, and push or replace the values of the array of output matrices at the end of each simulation, in the case these output matrices are used within the simulation in hot loops (which may cause the false-sharing issue).

This is a working minimal example:

using ChunkSplitters
using Base.Threads

function simulation!(output, input, buffer)
	s = 0.0 # expensive stuff, just to take time
	for i in 2:10^3
		s += log(i)^(1/13)
	end
	buffer[1] = input
	output[1] = s * input * buffer[1]
end

function many_simulations2(n; nchunks = Threads.nthreads())
    inputs = 1:n
	outputs = [ zeros(1) for _ in 1:n ]
	buffer = [zeros(1) for _ in 1:nchunks]
	@threads for (sim_range, i_chunk) in chunks(1:n, nchunks)
	    for isim in sim_range
			simulation!(outputs[isim], inputs[isim], buffer[i_chunk])
		end
	end
	return outputs
end

This is a variation of the code above, using channels to store the buffers:

#
# Using channels to store buffers
#
function many_simulations(n; nchunks = Threads.nthreads())
    inputs = 1:n
    outputs = [ zeros(1) for _ in 1:n ]
	buffers = Channel{Vector{Float64}}(nchunks)
	for _ in 1:nchunks 
		put!(buffers, zeros(1))
	end
    @sync for (sim_range, _) in chunks(1:n, nchunks)
		@spawn begin
			buffer = take!(buffers)
			for isim in sim_range
				simulation!(outputs[isim], inputs[isim], buffer)
			end
			put!(buffers, buffer)
		end
    end
	return outputs
end

I think also that this (simpler) pattern will solve any possible false sharing associated with the buffers. This creates the buffers independently for each chunk of work, and locally in the loop.

#
# Allocate the buffer for each chunk: avoids false sharing
#
using ChunkSplitters, Base.Threads
function many_simulations3(n; nchunks = Threads.nthreads())
    inputs = 1:n
    outputs = [ zeros(1) for _ in 1:n ]
    @sync for (sim_range, _) in chunks(1:n, nchunks)
		@spawn begin
			local buffer = zeros(1) # obs: this "local" is redundant
			for isim in sim_range
				simulation!(outputs[isim], inputs[isim], buffer)
			end
		end
    end
	return outputs
end

(I don’t see any difference in performance in any of these functions, in the MWE).

4 Likes

This is a very good question. The docs probably should contain some more recipes for that.

Supposing that your simulations are relatively expensive, the approach with channels is probably not bad. I took the liberty of filling in some more details, and ensuring reproducibility (wrt random seeds); then it could look like

julia> using Random
julia> nsimulations = 1000; dims = 2;
julia> workvectors = Channel{Vector{Float64}}(Threads.nthreads()); for i = 1:Threads.nthreads() put!(workvectors, zeros(dims)) end;
julia> out = [zeros(dims,dims) for i=1:nsimulations];
julia> function monteCarlo!(res, tmp)
       rand!(tmp)
       res .= tmp*tmp'
       nothing
       end
julia> Random.seed!(1235); @sync for i=1:nsimulations
       Threads.@spawn begin
       tmp = take!(workvectors)
       try
       monteCarlo!(out[i], tmp)
       finally put!(workvectors, tmp) end
       end
       end; hash(out)
0xdd530331c140432d
julia> Random.seed!(1235); @sync for i=1:nsimulations
       Threads.@spawn begin
       tmp = take!(workvectors)
       try
       monteCarlo!(out[i],tmp)
       finally put!(workvectors, tmp) end
       end
       end; hash(out)
0xdd530331c140432d
julia> Random.seed!(1234); @sync for i=1:nsimulations
       Threads.@spawn begin
       tmp = take!(workvectors)
       try
       monteCarlo!(out[i],tmp)
       finally put!(workvectors, tmp) end
       end
       end; hash(out)
0x7261a689330440c7

Note that the results are completely reproducible and independent of the number of threads.

Regarding the workvectors, there is a potential issue with false sharing (i.e. different workvectors sharing a cache-line). I’m not 100% sure how to solve that reliably; maybe there should be a package. If your workvectors have appreciable size then this is probably a non-issue.

I’m not up-to-date with the detailed julia allocator behavior; maybe one needs a Threads.@threads in front of the allocation of workvectors in order to place one in each threads threadlocal memory pool. (for large workvectors this is probably a non-issue; small workvectors don’t come out of the system allocator, so you have at least a chance without some kind of heap feng shui specific to your user’s choice of jemalloc or mimalloc or ptmalloc or whatever)

edit: Relatively expensive means probably at least one millisecond per monte_carlo! call, to make all threading/synchronization overheads irrelevant. If its less than maybe 50 microseconds, then you probably should consider doing something else, like e.g. chunking multiple monte carlo runs. You will need to test that.

2 Likes

Had a possibly similar need before, and ended up using FLoops.jl which I can recommend. Maybe see also my own question a while ago.

I use the following pattern, maybe it’s useful in your case? I think your work_vector could be varexternal below.

using FLoops

ex = ThreadedEx() # or SequentialEx()

@floop ex for i = 1:nparticles
    @init ve = deepcopy(varexternal)

    # compute something by f, potentially using external variables ve
    # (each thread base has its "own" ve; ve can be mutated in-place)
    out[i] = f(ve)
end

+1 for ChunkSplitters.jl, very easy to work with.

Thanks for the answers, this has helped me alot :slight_smile:. I have one more question. Comparing to what @Imiq said, if I were to do the parallelisation over processes instead of threads, would the approach still be exactly identical? Or should I, for example, also use a channel for out?