Task/thread-local caches/buffers

What is currently the best way to handle task/thread-local caches/buffers that need to be accessed efficiently from functions called from a Threads.@threads for loop?

The background is as follows. As described in the recent blog post PSA: Thread-local state is no longer recommended, the following pattern was designed and used for older versions of Julia but doesn’t work correctly on Julia v1.8 and newer.

# Package PkgA

# Cache of `Vector{Int}` with 64 elements per task/thread
const BUFFER_LENGTH = 64
const CACHE = Vector{Vector{Int}}()

function __init__()
    Threads.resize_nthreads!(CACHE,
                             Vector{Int}(undef, BUFFER_LENGTH))
end

function foo(x)
  cache = CACHE[Threads.threadid()]
  # do something with x and cache
  return some_value
end


# Package PkgB
function bar(values)
  Threads.@threads for i in eachindex(values)
    x = values[i]
    values[i] = foo(x) # Calling PkgA.foo
  end
end

My goal is to have some code in PkgA that

  • is still as efficient as before
  • is correct without requiring PkgB to use Threads.@threads :static
  • allows PkgB to use Threads.@threads and does not require Threads.@spawn
  • is reasonably easy to code and understand

Do you have any recommendations for this?

1 Like

maybe can provide some inspiration

You can actually do this by hand, but this is what Home · ChunkSplitters.jl does.

Basically you would write that as:

function foo(x; cache)
  # do something with x and cache
  return some_value
end


# Package PkgB
using ChunkSplitters
function bar(values; nchunks = Threads.nthreads())
  Threads.@threads for (i_range, ichunk) in chunks(values, nchunks)
    for i in i_range
        x = values[i]
        values[i] = foo(x; cache=CACHE[ichunk]) # Calling PkgA.foo
    end
  end
end

I don’t know of any solution that avoids passing cache (or at least the ichunk index) to foo as a parameter, though.

Home · ChunkSplitters.jl this is amazing, light-weight & useful

is there a chunking scheme (and associated @threds primitive maybe) such that all threads process 1st chunk and then move to the next one? It’s close to :scatter but I want “sync” between moving to next chunk I guess

want this because there’s expensive I/O cache associated with reading each chunk (our current approach, highlighted in the linked discourse post, is to have task-local cache and do the :batch scheme)

I’m not sure if that applies to your case, but you can control the number of threads and chunks independently. If you increase nchunks, each task will be lighter, thus by controlling the number of parallel threads being run (by the nthreads() set on Julia startup) it might be possible to tune the total amount of resources used per iteration.

maybe I can put it in a different way. Imagine I only want to keep one chunk in RAM (my target full array is lazily read out), and have all the threads work on it until it’s done. Is there a way to do it?

btw, I can’t control the chunk size, that’s determined by the file format (my array is an abstraction of it)

You can imagine I don’t have enough RAM to hold 2 chunks, and if a thread getindex() into a different location too early, it would overwhelm the RAM.

Maybe you could use ChunkSplitters to partition the tasks, but not parallelizing the outer loop, but the inner iteration, something like:

julia> nchunks = 20
       for (task_range, ichunk) in ChunkSplitters.chunks(tasks, nchunks)
           # load buffer buff[ichunk]
           @threads for task_index in task_range
                # do stuff with this buffer
           end
       end

But in this case the possible concurrency issues are in the inner loop of course.

This package assumes that you can provide the number of chunks, at least, and it will only iterate (lazily) splitting the indexes of the tasks into that many chunks.

1 Like

There wasn’t really an answer to your question in the thread you linked, was it?

thus only inspiration, depending on the problem size, you can use the task_local_storage() shown.

The double-lock pattern also works btw, where you have a vector of nthreads() locks, and each task take the lock before working on corresponding cache index

Thanks for all your answers/suggestions/comments so far!

That’s really the tough part for me. The problem is a little bit more complex than I sketched in my initial post. It’s more like PkgA.foo is called by PkgB.bar but the buffer/cache is not used idrectly by PkgA.foo but by other functions defined in PkgA called by PkgA.foo. Currently, PkgA.foo doesn’t have a user-facing interface providing the option to pass a cache to the functions it calls - and it would make the interface extremely ugly.

2 Likes

Indeed, that is harder. I don’t see any way to do that safely without paying the price of a lock somewhere. Using channels is probably a reasonable alternative, unless this is a very, very, tight loop.

I would try something like

# Package PkgA

# Cache of `Vector{Int}` with 64 elements per task
const BUFFER_LENGTH = 64

function foo(x)
  cache = get!(() -> Vector{Int}(undef, BUFFER_LENGTH),
               task_local_storage(), :cache)::Vector{Int}
  # do something with x and cache
  return some_value
end


# Package PkgB
function bar(values)
  Threads.@threads for i in eachindex(values)
    x = values[i]
    values[i] = foo(x) # Calling PkgA.foo
  end
end

I have used this pattern myself and it seems to work and be efficient but I’m not an expert on multithreading, so for all I know it could have any number of subtle gotchas. Hopefully someone more knowledgeable can vouch for it or explain its shortcomings.

1 Like

Thanks! I will try it