Is this pattern still thread safe in Julia 1.12?

I adopt this pattern (1) to allow non default random threads that may be not thread safe themselves and (2) to obtain the same result whatever the number of threads used:

import StableRNGs, Random

function goo(outseed)
    rng = StableRNGs.StableRNG(outseed) # in the real app rng is choosen by the user
    n_trees = 4
    masterSeed =rand(rng,100:Int(floor(typemax(Int64)/10000))) # Some RNG have problems with very small seeds.
    rngs = [deepcopy(rng) for i in 1:Threads.nthreads()+1]
    results = Vector{Float64}(undef,n_trees)
    Threads.@threads for i in 1:n_trees
        tsrng = rngs[Threads.threadid()] # Thread safe random number generator
        Random.seed!(tsrng,masterSeed+i*10)
        out = rand(tsrng)
        results[i] = out
    end
    return results
end

I have noticed that in julia 1.12 I need to specify “Threads.nthreads()+1” (the +1 addition is new), and I am aware that “buffers should not be managed based on threadid()”.
However, in my case this is just to host a rng that is re-seeded based on the index of the loop, not on the threadid.

What are your thoughts ?

I suspect that’s still problematic . @threads aren’t threads, they’re tasks that can be run multi-threaded. You need task-local state, not thread-local state.

Is it truly the RNG itself that is your bottleneck here? Or could you gather all the numbers you need in advance (single-threaded) and then use them multi-threadedly?

3 Likes

Thanks @mbauman
My issue with prefetch the random values is that it is not easy to compute the numbers I need in advance, at least not always. Also, I have often inner functions that expect a rng.
I will review this…

If the calculation in the loop is sufficiently long (more than 10-100ms or so), I probably would not bother too much and prefer the simple version where you create a new RNG for each iteration:

function goo(outseed)
    rng = StableRNGs.StableRNG(outseed) # in the real app rng is choosen by the user
    n_trees = 4
    masterSeed =rand(rng,100:Int(floor(typemax(Int64)/10000))) # Some RNG have problems with very small seeds.
    results = Vector{Float64}(undef,n_trees)
    Threads.@threads for i in 1:n_trees
        tsrng = StableRNGs.StableRNG(masterSeed+i*10)
        out = rand(tsrng) # assuming this is a stand-in for some long calculation
        results[i] = out
    end
    return results
end

Thanks @Mason for pointing out I forgot to delete an essential line.

2 Likes

You are explicitly using StableRNGs. This is necessary if you want exact reproducibility across julia versions.

If you don’t need that, julia’s built-in RNG is already doing what you want:

function goo(outseed, n_trees)
    res = Vector{Float64}(undef,n_trees)
    task = @spawn begin
    Random.seed!(outseed)
    @sync for chunk in ChunkSplitters.index_chunks(1:n_trees; n = 128) #ok for up to 128 cores
        @spawn for idx in chunk
            res[idx] = rand()
        end
    end
    wait(task)
    res
end

I.e. it is designed to be deterministic unless you write racy code, and be independent of nthreads unless you use very legacy constructions like @threads that rely on nthreads().

Above construction has the extra level of task-nesting to avoid overwriting the old RNG state.

Unfortunately your code was already wrong in 1.10 and possibly in 1.9; start julia with julia -t 4,4 to see the issue. Conversely, you can start julia with e.g. julia -t 4,0 to avoid the +1 thing.

The issue is that julia has multiple threadpools, for interactive and batch-processing threads. Interactive threads come first in the numbering. Before julia 1.12, the default size of the interactive pool was size zero, now the default is one interactive thread; but command-line args or environment variables already could change that.

4 Likes

Isn’t this scenario what Random123 was designed to address?

is there number of thread**pools* something that can change ? Currently there is a function Threads.nthreadpools() that returns 2 (for the default and interactive ones) but the fact itself that there is this function make me think that this is also something that could change.

1 Like

I guess it’s not configurable by the user (e.g. you cannot let it be 3).

The related PR was

That isn’t thread safe.


Edit: Fixed now

2 Likes

So, something like this (to allow user choose the rng) ?

function goo3(rng::Random.AbstractRNG = Random.GLOBAL_RNG)
    rng_type = typeof(rng) 
    n_trees = 4
    masterSeed =rand(rng,100:Int(floor(typemax(Int64)/10000))) # Some RNG have problems with very small seeds.
    results = Vector{Float64}(undef,n_trees)
    Threads.@threads for i in 1:n_trees
        tsrng = rng_type(masterSeed+i*10)
        out = rand(tsrng) # assuming this is a stand-in for some long calculation - yes :-)
        results[i] = out
    end
    return results
end

It is probably a bit more general to make the process of “recreate this RNG with a new seed” a new function in case there are some special RNGs that require a different construction

function copy_and_reseed(rng, seed) # maybe find a better name
    typeof(rng)(seed)
end

function goo4(rng::Random.AbstractRNG = Random.GLOBAL_RNG)
    n_trees = 4
    masterSeed =rand(rng,100:Int(floor(typemax(Int64)/10000))) # Some RNG have problems with very small seeds.
    results = Vector{Float64}(undef,n_trees)
    Threads.@threads for i in 1:n_trees
        tsrng = copy_and_reseed(rng, masterSeed+i*10)
        out = rand(tsrng) # assuming this is a stand-in for some long calculation - yes :-)
        results[i] = out
    end
    return results
end

This way you don’t require the RNG’s type to a have a constructor that takes a single seed. This should not show any performance difference and is just more flexible.

In 1.12, can’t you do

rngs = OncePerTask{StableRNG}(() -> StableRNG(outseed))

and then do

tsrng = rngs()

in the loop?

3 Likes

I guess, but this seems pretty brittle for a generic method. I don’t think you’re not really guaranteed that typeof(rng)(seed) is the right way to construct the RNG object in general.

I saw before you used deepcopy, but I think isntead it should be just regular copy, since deepcopy is really dangerous. I’d suggest something like

#------------------------------------------------v use default_rng instead of GLOBAL_RNG
function goo4(rng::Random.AbstractRNG = Random.default_rng())
    n_trees = 4
    masterSeed =rand(rng,100:Int(floor(typemax(Int64)/10000))) # Some RNG have problems with very small seeds.
    results = Vector{Float64}(undef,n_trees)
    Threads.@threads for i in 1:n_trees
#------------------v copy the rng and seed it instead of applying it's type to the seed
        tsrng = copy(rng)
        Random.seed!(tsrng, masterSeed+i*10)
        out = rand(tsrng) # assuming this is a stand-in for some long calculation - yes :-)
        results[i] = out
    end
    return results
end

If you want to avoid so much copying / reconstruction of the RNG, you could do some manual @spawn-ing like so:

using ChunkSplitters
function goo4(rng::Random.AbstractRNG = Random.default_rng(); ntasks=Threads.nthreads())
    n_trees = 4
    masterSeed =rand(rng,100:Int(floor(typemax(Int64)/10000))) # Some RNG have problems with very small seeds.
    results = Vector{Float64}(undef,n_trees)
    # Split 1:n_trees up into a number of chunks equal to ntasks
    tasks = map(Iterators.partition(1:n_tress, n_treesántasks)) do chunk
        # Spawn a task that does one unit of parallel work
        Threads.@spawn begin
            # Seed the rng *before* looping so it only happens once per task
            tsrng = copy(rng)
            Random.seed!(tsrng, masterSeed+first(chunk)*10)
            for i in chunk
                out = rand(tsrng) # assuming this is a stand-in for some long calculation - yes :-)
                results[i] = out
            end
        end
    end
    foreach(wait, tasks) # wait for all the tasks to finish
    return results
end

This gives you a bit more control over what’s going on, and makes it so you only copy the RNG and re-seed it once per task. ChunkSplitters.jl will also provide a better alternative to Iterators.partition if you’re okay with adding a dependency.

This has the issue that all tasks will get the same seed → collisions.

You really need some deterministic name / number for each task that you mix into the seed.

The default RNG in julia uses the structure of tasks: All except for the root Task have a parent; and all children of a Task are created in a sequence, so each Task has a “pedigree” string like “[5, 1, 3]”, which means “third child of the first child of the fifth child of the root task”; and that is the thing used.

The OP is resetting the seed to a different (deterministic) value for each loop iteration, so the initial seed of each task should be irrelevant.

My point here is that OncePerTask mimics what seems to be the intended behavior of the OP’s original code, except without the lack of thread safety of using a per-thread array. Whether that intended behavior is a good idea is a separate question.

2 Likes