Multithreading and random number generators

How one can approach multithreading simulations, which uses random numbers generators? Especially if one wants to use non Base generators, like StableRNGs.jl, RandomNumbers.jl and others.

Here is a simple example of a random walk, which I want to make parallel

using Random
using StableRNGs

function randomwalk!(tbl, i, rng)
    len = size(tbl, 1)
    tbl[1, i] = rand(rng)
    @inbounds for j in 2:len
        tbl[j, i] = (tbl[j - 1, i]) + rand(rng)
    end
end

function randomwalk(nparticles, t, rng)
    tbl = Array{Float64, 2}(undef, t, nparticles)
    map(i -> randomwalk!(tbl, i, rng), 1:nparticles)

    return tbl
end

rng = StableRNG(2020)
randomwalk(10, 1000, rng)

Here is what I want to achieve

  1. I want to be able to use different number generators (MersenneTwister, StableRNG, etc)
  2. I want calculations to be run on multiple threads
  3. Calculations should be random in a sense, that there shouldn’t be a correlation between random numbers on different threads. Well, as much as we can talk of non-correlation of pseudo-random numbers anyway.
  4. It would be good for this algorithm to be not too complicated, so it can be easily used in other similar calculations.
  5. As a bonus it would be good to have these calculations reproducible. But this is not a very strict requirement, i.e. if one thread is running ahead of another and grab RNG first, it’s ok, as long as the random sequence is not correlated with other threads.

I guess I can do some bookkeeping in randomwalk function and generate some amount of RNGs, but which way is more appropriate;

  1. Generate new RNG on-the-fly for each new index i, where RNG seed should be generate from the main generator.
  2. Pregenerate RNGs for all elements, and then feed them to randomwalk! function.
rngs = StableRNG.(rand(rng, 1:typemax(Int), length(nparticles))
map(i -> randomwalk!(tbl, i, rngs[i]), 1:nparticles)

by the way, is it correct to use 1:typemax(Int) or range should be made even larger?
3. Generate only Threads.nthreads() number of new RNGs and determine which one should be used inside randomwalk! function?

In this discussion Reproducible multithreaded monte-carlo / Task local random if I understand it correctly, it is claimed, that MersenneTwister is thread-safe now. Is it possible to somehow “upgrade” other RNG to make them thread-safe too? And on the other hand, wouldn’t thread safety of RNG slow down multithreading calculations?

1 Like

I don’t really have an answer, but just few comments.

That would really depend on the RNG type. Generating a new MersenneTwister on each iteration might be very expensive, whereas StableRNG is very cheap to create.

So that might be the most general solution, as it works also for expensive-to-create RNGs.

What is thread-safe is to use the “global RNG” concurrently, as in fact there a multiple “global” RNGs, one for each thread. But it’s unsafe to use one given RNG concurrently, as is the case for most RNGs.

I think your solution 3. above is the one generally shared on Julia forums for that.

I think that really depends on the RNG type, i.e. how it does initialization. Note also that StableRNG was not really intended for this use case; it currently accepts a seed in 0 <= seed <= typemax(UInt) (but it could be made to accept a seed almost equal to typemax(UInt128)), but no facility is provided to produce two StableRNGs which are as much not correlated as possible. But maybe random seeds would be good enough.

2 Likes

So, what is happening if I am using RNG naively in multithreading context?

Something like

using Random
using StableRNGs

function f(rng)
  res = Vector{Float64}(undef, 100_000)
  Threads.@threads for i in 1:100_000
     res[i] = rand(rng)
  end
  return res
end

f(Random.GLOBAL_RNG)
f(StableRNG(2020))

They are definitely use all cores in both cases and produce some results, but what is going on with RNG at this moment? Are values duplicated somehow or rng is locked between threads or it is something else entirely?

For f(Random.GLOBAL_RNG), something special happens: GLOBAL_RNG is not a MersenneTwister, since Julia 1.3 it’s a special type which was created to not break compatibility (nice trick from Jameson), so that you can still call rand(GLOBAL_RNG) while being thread-safe; but it’s equivalent to calling rand(Random.default_rng()), where default_rng() returns the “thread-local global RNG”. So rand(GLOBAL_RNG) is thread-safe. But rand(::MersenneTwister) is not, and your f(StableRNG(2020)) is not, and will result in corrupted state.

1 Like

So, I shouldn’t use them naively, because state is corrupted and results are unreliable. Good to know, thank you.

One last question, is it a viable approach to extend rand function to accept vector of RNG in this manner

rand(rngs::AbstractVector) = rand(rngs[Threads.threadid()])

# Use it like this.
# single thread
rng = StableRNG(2020)
randomwalk(10, 1000, rng)

# multi-thread (of course `@Threads.threads` or something similar should be used inside `randomwalk` to actually use multithreading
rngs = StableRNG.(2020 .+ collect(1:Threads.nthreads()))
randomwalk(10, 1000, rngs)

Of course, other versions of rand should be extended too if needed, may be something like

# to avoid type piracy
struct VectorRNG{T}
  rngs::Vector{T}
end

rand(rngs::VectorRNG, args...) = rand(rngs.rngs[Threads.threadid()], args...)

I would use your second solution, building on it here is how I would go about it:


struct VectorRNG{T} <: AbstractRNG
    rngs::Vector{T}
end

struct VectorRNGSampler{T,X} <: Random.Sampler{X}
    sp::T

    VectorRNGSampler(sp::T) where T = new{T,Random.gentype(sp)}(sp)
end

for N = (Val{1}, Val{Inf})
    for S = (Any, AbstractFloat)
        @eval Random.Sampler(::Type{VectorRNG{T}}, ::Type{X}, n::$N) where {T,X<:$S} =
            VectorRNGSampler(Random.Sampler(T, X, n))
    end
    for S = (Any, AbstractUnitRange{<:Base.BitInteger64})
        @eval Random.Sampler(::Type{VectorRNG{T}}, x::X, n::$N) where {T,X<:$S} =
            VectorRNGSampler(Random.Sampler(T, x, n))
    end
end

Random.rand(rngs::VectorRNG, t::VectorRNGSampler) = rand(rngs.rngs[Threads.threadid()], t.sp)

There has to be loops to avoid ambiguities, maybe it’s possible to improve Random to get less of these.

[DISCLAIMER: I almost didn’t use multi-threading in Julia!]

2 Likes

I unfortunately missed this discussion, but my JuliaCon talk is very relevant for this and discusses how we solved this problem in MixedModels.jl. There are various things we could do to make it more performant, but we have a strong reproducibility guarantee that is independent of the number of threads and doesn’t require the big allocation I think is inherent in your (2).

I found an other solution that doesn’t use locks, and I can have, depending of the needs, (a) different results, (b) different results but the same “sequence” whenever I reset the random number generator or (c) the same result at each call. However it works only with MersenneTwister. I can’t get it working with StableRNG, that is indeed what I use in the rest of my library (above all, to guarantee the same random number independently than the specific Julia version).

using Statistics, Random,StableRNGs

x = rand(100)

function generateParallelRngs(rng::Union{Random._GLOBAL_RNG,MersenneTwister,StableRNGs.LehmerRNG}, n::Integer)
    step = rand(rng,big(10)^20:big(10)^40) # making the step random too !
    rngs = Vector{Union{MersenneTwister,Random._GLOBAL_RNG,StableRNGs.LehmerRNG}}(undef, n)
    rngs[1] = copy(rng)
    for i = 2:n
        rngs[i] = Future.randjump(rngs[i-1], step)
    end
    return rngs
end

function innerFunction(bootstrappedx; rng=MersenneTwister())
     sum(bootstrappedx .* rand(rng) ./ 0.5)
end

function outerFunction(x;rng = MersenneTwister())
    rngs = generateParallelRngs(rng,Threads.nthreads())
    results = Array{Float64,1}(undef,30)
    Threads.@threads for i in 1:30
        tsrng = rngs[Threads.threadid()] # Thread safe random number generator
        toSample = rand(tsrng, 1:100,100)
        bootstrappedx = x[toSample]
        innerResult = innerFunction(bootstrappedx, rng=tsrng)
        results[i] = innerResult
    end
    overallResult = mean(results)
    return overallResult
end

# Different sequences..
outerFunction(x)
outerFunction(x)
outerFunction(x)

# Different values, but same sequence
mainRng = MersenneTwister(123)
outerFunction(x, rng=mainRng)
outerFunction(x, rng=mainRng)
outerFunction(x, rng=mainRng)

# Same value at each call
outerFunction(x,rng=MersenneTwister(123))
outerFunction(x,rng=MersenneTwister(123))
outerFunction(x,rng=MersenneTwister(123))

If I try outerFunction(x,rng=StableRNG(123)) I have however that randJump is not implemented for StableRNGs. Any alternative ?

1 Like