Thread safe memoization

I have the following code, using memoization:

# Copyright (c) 2025 Marcus Becker, Uwe Fechner
# SPDX-License-Identifier: BSD-3-Clause

"""
    discretizeRotor(n_rp::Int) -> Tuple{Matrix{Float64}, Vector{Float64}}

Discretizes the rotor into `n_rp` segments using the isocell algorithm.

Memoization: results are cached per thread. Repeated calls with the same `n_rp`
on the same thread reuse the cached arrays (no lock needed). Do not mutate the
returned arrays, as they are shared within the thread.

# Arguments
- `n_rp::Int`: The number of radial points to discretize the rotor into.

# Returns
- `(m_rp, w)` where:
  - `m_rp::Matrix{Float64}`: Size `(nC, 3)`; first column zeros, columns 2–3 are
    normalized coordinates in `[-0.5, 0.5]`.
  - `w::Vector{Float64}`: Weights per cell that sum to approximately 1.

# Notes
- Per-thread cache avoids contention; different threads may compute and hold
  their own cached copies for the same `n_rp`.
- The isocell algorithm may not yield exactly `n_rp` cells but aims for a similar number.
- For details, see: Masset et al. (2009)
  https://orbi.uliege.be/bitstream/2268/91953/1/masset_isocell_orbi.pdf
- The choice `N1 = 3` is used here; values of 4 or 5 are also viable.
"""
function discretizeRotor(n_rp::Int)
  cache = _get_discretizeRotor_thread_cache()
  return get!(cache, n_rp) do
    _compute_discretizeRotor(n_rp)
  end
end

# Lock-free per-thread memoization caches.
# Use a Ref holding a vector of Dicts, initialized to length 0 at load time to
# avoid baking in the precompile-time thread count.
const _discretizeRotor_caches_ref = Base.RefValue{Vector{Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}}}(Vector{Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}}(undef, 0))

@inline function _get_discretizeRotor_thread_cache()
  caches = _discretizeRotor_caches_ref[]
  tid = Base.Threads.threadid()
  if length(caches) < tid
    # Grow to current nthreads()+1; preserve existing dicts
    newlen = Base.Threads.nthreads()+1
    newc = Vector{Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}}(undef, newlen)
    @inbounds for i in 1:newlen
      newc[i] = i <= length(caches) ? caches[i] : Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}()
    end
    _discretizeRotor_caches_ref[] = (caches = newc)
  end
  return caches[tid]
end

@inline function _compute_discretizeRotor(n_rp::Int)
  # DISCRETIZEROTOR discretizes the rotor plane into n_rp segments.

  N1 = 3
  n = round(Int, sqrt(n_rp / N1))

  # Radial thickness of each ring
  dltR = 1 / n
  nC = N1 * n^2

  # m_rp matrix: nC rows, 3 columns (initialize to zeros)
  m_rp = Array{Float64}(undef, nC, 3)
  fill!(m_rp, 0.0)

  @inbounds for i in 1:n
    nR = (2 * i - 1) * N1
    # Sum of first i odd numbers = i^2, scaled by N1
    i_e = N1 * i^2
    i_s = i_e - nR + 1

    radius = 0.5 * dltR * (0.5 + (i - 1))
    step = 2Ď€ / nR
    for k in 0:(nR - 1)
      idx = i_s + k
      ang = (k + 1) * step
      m_rp[idx, 2] = radius * cos(ang)
      m_rp[idx, 3] = radius * sin(ang)
    end
  end

  w = Vector{Float64}(undef, nC)
  fill!(w, 1 / nC)

  return m_rp, w
end

It creates one dictionary per thread and works fine.

But I was told not to think of threads any more, but of tasks instead. So perhaps this implementation is not really thread-safe any more?

Use

buf = OncePerTask() do 
   Dict()
end 
@threads for _ in 1:10
      bufi = buf() 
      ....
end

Or use a channel if you like the mindset of it

1 Like

I suppose it’s not thread-safe, since you could possibly get the situation where two tasks (values of n_rp) start concurrently on the same thread, but then migrate to different threads. So at this point they will run in parallel, using the same cache. If one task then modifies the Dict('s Memory layout) while the other is reading from it (or modifying it itself), you will run into issues.

Two task will get different cache by definition of OncePerTask no metter they use the same thread, the issue you’re mentioning should only happen when using OncePerThread.
Even if the task yield to another thread there is no chance the cache is used in another task

Perhaps I replied to the wrong post in Discourse, but I was referring to @ufechner7 original code and question on thread-safety. A task-centric approach should definitely be safe, but that didn’t answer the original question.

1 Like

Oh sorry didn’t check, yes it’s not safe without :static or locks

1 Like

Do you agree that your solution is not possible with the current function signature:

function discretizeRotor(n_rp::Int)

?
In other words: Why is there no function Base.Threads.taskid()? Are this principle reasons, or is it just a not yet implemented feature?

what would you mean by taskid, to be clear, since we never explain it this is what we mean by “thinking of task”

function task_based_paralel()
    n = 100
    x = rand(n)
    z = zeros(n)
    it = 1:n
    chunks = Iterators.partition(it, nĂ·4)
    cache = [zeros(2) for _ in 1:length(chunks)]
    @sync for (id,chunk) in enumerate(chunks)
        Threads.@spawn begin
            cachei = cache[id]
            for i in chunk
                cachei[1] = x[i]^2
                cachei[2] = x[i]^3
                z[i] = x[i] + cachei[1] + cachei[2]
            end
        end
    end
    return z
end

here each spawn is a Task and the “task id” you want would be the index of the chunk ? but they are cases where thinking of a task id can mean a lot of things in real.
Why is it different than threads ? a task in a spawn can yield in which case another thread may take the lead within the spawn.

For simple problem the OncePerTask I gave you can be enough, but for advenced threading (maybe your case) it is still better to just think of tasks spawning and chunk your problem yourself or if you want a special sheldurer ect just use OhMyThreads.jl

Again, you do not focus on my main question. My main question is, I have a function, it is fast and - in practice - thread safe. You say it is not thread safe any longer if I use threads that are not of type :static . So I have to re-write this function.

My current impression is, with the new schema I cannot re-write this function and keep the function signature. Do you agree?

I would write this as

using TaskLocalValues
const _cache = TaskLocalValue{Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}}() do
    Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}()
end
_get_discretizeRotor_thread_cache() = _cache[]

(or replace TaskLocalValue with OncePerTask if you’re only running this on v1.12+).

You could use the pointer_from_objectref(::Task), but you probably shouldn’t because there’s an unbounded number of tasks that might exist at any given time, and their memory addresses can be re-used if one is garbage collected.

I don’t agree.

No reason you can’t keep the signature, and truth is it was never thread safe, its just lucky to not yield and yielding will happen more often in 1.12

What you are doing here seems to be as if you are sort of trying to implement channels yourself. You could do something like:

# global
const caches =  Channel{TypeOfChache}(Threads.nthreads())
function initialize_caches(caches)
    for _ in 1:Threads.nthreads()
        put!(caches, _initialize_cache())
    end
end
initialize_caches(caches)

# you current function
function discretizeRotor(n_rp::Int)
  cache = take!(caches)
  update!(cache) # probably a function like yours above that memoizes stuff
  return get!(cache, n_rp) do
    _compute_discretizeRotor(n_rp)
  end
  put!(caches, cache)
end

no need of indexing the caches in any way.

As far as I was aware (though I could definitely be wrong), the yielding behaviour isn’t changing with 1.12. The change is that nthreads() is no longer the maximum threadid() that one might observe, since the interactive threads go first and there’ll be an extra interactive thread spawned by default.

Oh ok, I don’t remember where I saw that the thread based code problem become (or will be) more and more present in newest version may be wrong then

There is current_task, exported from Base, and similarly task_local_storage (which can be used instead of Base.OncePerTask).

If you have many more tasks than threads, e.g. if you expect your users to call your function repeatedly, then it may be inefficient that each new task creates a new cache. In that case, you can have a const _discretizeRotor_caches = Channel{CacheType}(Inf) (for whatever CacheType you use for caching - a specific Dict subtype in your case). Then, when a new task is spawned that calls discretizeRotor, it can:

  1. Lock _discretizeRotor_caches
  2. If there are any caches already in the channel, get a cache from the channel. Else, make a new one
  3. Unlock the channel
  4. Call _compute_discretizeRotor with the cache
  5. When done, lock the channel and put back the cache, before unlocking it.

Note that this is no longer lock free, and if some user spawns thousands of tasks that concurrently need the cache, you may want to disable caching so the memory usage doesn’t blow up.

Alternatively, you can cap the size of _discretizeRotor_caches and initialize it with empty dicts. Then, if lots of tasks are spawned, they will wait for access to the limited cache pool.

Here is a simple FIFO cache you can use to keep a limited pool of caches:

struct FIFOCache{T}
    sem::Base.Semaphore
    lock::ReentrantLock
    items::Vector{T}
end

function FIFOCache{T}(initializer, n_caches::Int) where T
    sem = Base.Semaphore(n_caches)
    items = T[]
    for _ in 1:n_caches
        push!(items, initializer()::T)
    end
    FIFOCache{T}(sem, ReentrantLock(), items)
end

function with_cache(f, cache::FIFOCache)
    Base.acquire(cache.sem)
    @lock cache.lock (item = pop!(cache.items))
    local item
    try
        f(item)
    finally
        @lock cache.lock (push!(cache.items, item))
        Base.release(cache.sem)
    end
end
1 Like

Your function hasn’t been thread safe since julia version 1.7, since task migration was introduced in 1.7 (see Multi-Threading · The Julia Language). You might be lucky so your tasks don’t migrate, but that may change without notice, probably even in minor julia versions. Or because of minor changes to the program, or running on a system with slightly different timing. If your tasks are sticky (e.g. with @threads :static ...), they will remain on the same thread, otherwise they may migrate.

That means you should not use threadid() to index into an array, because your threadid() can change, just like the actual cpu core you’re running on can change.

If you need a contention-free cache local to your task, you can use task-local storage for this, e.g.

cache = get!(task_local_storage(), :mycache)::CacheType do
    makeacache()::CacheType
end

However, this cache does not survive the @threads for ... loop (since @threads creates new tasks which disappear when the for loop completes). If you have several @threads for ... loops which would benefit from using the same cache, you need to organize your own cache-pool, either with a lock, or take! and put! them on a Channel. Alternatively, as suggested above, you can make your own task-id and index into a Vector.

caches = makecaches(numtasks)  #Vector{CacheType}
for i in 1:numtasks
    @spawn begin
        mycache = caches[i]        
        compute(mycache,...)
    end
end

If mycache should not be an argument to compute, you can either store it in task local storage, or as a ScopedValue (from Base.ScopedValues), and pick it up in compute:

using Base.ScopedValues
const scopedcache = ScopedValue{CacheType}()
function compute(...)
    mycache = scopedcache[]
    ...
end

function main(...)
    caches = makecaches(numtasks)  #Vector{CacheType}
    for i in 1:numtasks
        @spawn begin
             @with scopedcache=>cache[i] compute(...)
        end
    end
end
1 Like

Yes I would recommend changing your functions to explicitly take in the cache, total number of threads, and “chunk” or “chunkid” (what you were using threadid for in the current version).

There are ways to get around explicitly passing these using ScopedValues and global variables if you want.

Also, the mutation of the global _discretizeRotor_caches_ref seems very fishy in:

@inline function _get_discretizeRotor_thread_cache()
  caches = _discretizeRotor_caches_ref[]
  tid = Base.Threads.threadid()
  if length(caches) < tid
    # Grow to current nthreads()+1; preserve existing dicts
    newlen = Base.Threads.nthreads()+1
    newc = Vector{Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}}(undef, newlen)
    @inbounds for i in 1:newlen
      newc[i] = i <= length(caches) ? caches[i] : Dict{Int, Tuple{Matrix{Float64}, Vector{Float64}}}()
    end
    _discretizeRotor_caches_ref[] = (caches = newc)
  end
  return caches[tid]
end

I don’t think this is thread safe without the _discretizeRotor_caches_ref being marked as atomic.

For example what I do in my own code is something like:

fc = initialize_context(;nthreads= 4, user_options1= 1234 ...)
for step in 1:N
  @sync for chunk in 1:fc.nthreads
    Threads.@spawn let chunk = $chunk, nthreads = $(fc.nthreads), force_energy = $(fc.per_thread_force_energy[chunk])
      set_zero_force_energy!(force_energy)
      calc_chunk_force_energy!(fc; chunk, nthreads)
    end
  end
  do_something_serially!(fc)
end

Then chunk and nthreads and the per chunk caches in fc get passed to all the functions calc_chunk_force_energy! calls (and the functions those functions call) that also want to split work among different threads and access caches indexed by chunk in fc.