Multithreading with shared memory caches

Hi there, I have a loop that uses a cache array to avoid excessive memory allocations. The structure is captured in the following example:

A = rand(10000)
B = similar(A)
cache = zeros(3)

for i in eachindex(A)
    f!(cache, A[i])
    B[i] = g(cache, A[i])
end

Only the in-place nature of f! is relevant and not its details, but for the sake of having a minimal working example, let me define

function f!(cache, a)
    cache[1] = a
    cache[2] = a^2
    cache[3] = a^3
end

function g(cache, a)
    return a*sum(cache.^2) 
end

In my real example, the loop takes a long time, so I would like to parallelize it using multithreading. Initially, I thought the following simple modification where I create a cache for each thread could work:

A = rand(10000)
B = similar(A)
threads_cache = [zeros(3) for i in 1:Threads.nthreads()]

@threads for i in eachindex(A)
    cache = threads_cache[Threads.threadid()]
    f!(cache, A[i])
    B[i] = g(cache, A[i])
end

However, the @threads documentation states that one should only write to locations not shared across iterations and that the value of threadid() may change even within the same iteration. So the example above would be incorrect since each cache is shared across iterations having the same threadid(). Creating a cache for each iteration would defeat the purpose of avoiding excessive memory allocations. So, is there a way to use multithreading while still benefitting from the in-place nature of the computation? Can I prevent different iterations to run concurrently in the same thread? Do I need to introduce locks or atomic operations to this example?

1 Like

One solution is to manually spawn just one task per thread (or, rather, per preallocated cache) and have each task handle as many values of i as necessary. The easiest way to distribute the is to tasks is to take them dynamically from a prepopulated channel, as follows:

is = eachindex(A)
ch = Channel{eltype(is)}(Inf)
foreach(i -> put!(ch, i), is)
close(ch)

@sync for cache in threads_cache
    Threads.@spawn for i in ch
        f!(cache, A[i])
        B[i] = g(cache, A[i])
    end
end

There’s of course a bit of overhead in this 100 % dynamic scheduling since iterating the channel involves locking under the hood, so if your loop body is super cheap you might want to pre-assign is to tasks instead.

2 Likes

You have to use the :static scheduler to make this work:

A = rand(10000)
B = similar(A)
threads_cache = [zeros(3) for i in 1:Threads.nthreads()]

@threads :static for i in eachindex(A)
    cache = threads_cache[Threads.threadid()]
    f!(cache, A[i])
    B[i] = g(cache, A[i])
end

See ??Threads.@threadsin the REPL for the details.

1 Like

Here is IMO the best way to approach this problem if you don’t want to use external libraries:

A = rand(10000)
B = similar(A)

# Break your work into chunks so that there's enough for each thread to get two chunks.
# More chunks per thread has better load balancing but more overhead
chunks_per_thread = 2
chunks = Iterators.partition(eachindex(A), length(A) ÷ (Threads.nthreads() * chunks_per_thread))

# map over the chunks, creating an array of spawned tasks
tasks = map(chunks) do chunk
    # For each chunk we spawn a task that creates some state, and then does your
    # single-threaded algorithm on that state, and the chunk it was given, then 
    # returns the state it created at the end
    Threads.@spawn begin
        cache = zeros(3)
        for i ∈ chunk
            f!(cache, A[i])
            B[i] = g(cache, A[i])
        end
        return cache
    end
end
# Now we fetch all the results from the spawned tasks
caches = fetch.(tasks)

Instead of creating state and then spawning parallel work that observes that state, you should instead think in terms of the parallel workers creating and returning their own state.

8 Likes

While this will work, I recommend against it because the static scheduler is not composable with other multithreaded code. We’re moving away from it for a good reason.

3 Likes

Thanks for this! I wasn’t aware of Iterators.partition.

I suppose if you want the load balancing of multiple chunks per thread without having to spawn more tasks than threads, you could combine our approaches:

chunks_per_thread = 2
chunks = Iterators.partition(eachindex(A), length(A) ÷ (Threads.nthreads() * chunks_per_thread))

ch = Channel{eltype(chunks)}(Inf)
foreach(chunk -> put!(ch, chunk), chunks)
close(ch)

tasks = map(1:Threads.nthreads()) do _
    Threads.@spawn begin
        cache = zeros(3)
        for chunk in ch
            for i in chunk
                f!(cache, A[i])
                B[i] = g(cache, A[i])
            end
        end
        return cache
    end
end

caches = fetch.(tasks)

For long-running compute-bound tasks, it seems plausible that the overhead from the occasional lock when taking the next chunk from the channel would be less than the overhead from tasks stepping on each other due to there being more tasks than threads. Of course, one should profile to know for sure.

1 Like

I think if you’re worried about the overhead of spawning tasks, then you should generally be even more worried about the overhead of using channels. I’m not an experienced user of channels, and I haven’t tried everything, but I’m not sure I’ve ever found a workload in my work that benefited from using channels rather than just spawning more tasks (this is probably because I’m almost always compute limited but I’m not sure).

Aren’t compute-limited tasks exactly the ones for which having more tasks than threads is detrimental? It’s not the spawning of tasks I’m worried about, but the fact that they will be competing for resources during execution. The scheduler will ask the running tasks to yield at regular intervals to distribute processing time fairly, (probably incorrect and not even meaningfully phrased, see below) when they could have been plowing through sequentially and without interruption at 100 % CPU. It’s like having multiple async tasks on a single thread—useful when you’re IO bound, waiting for user input or network resources etc., but not a way to run heavy computations.

The way I used the channel in my first example is absolutely not ideal, as each task has to acquire the lock for every single iteration. However, when combined with your manual chunking to reduce the number of channel accesses down to only a handful per task, almost all the overhead should be gone. Acquiring the lock, say, every few minutes shouldn’t be anything to worry about.

What’s important if you’re concerned about performance is to make sure the channel is typed (Channel{eltype(container)}) and that you either wrap a function around the code or make the channel const, such that the element type can be inferred within the task.

I may be misunderstanding something fundamental about how task scheduling works here, so I’d love to be corrected/informed!

It’s probably just because the MWE is much smaller, but is a shared cache really necessary here? The cache is overwritten every iteration, so if you don’t need to store the data from past iterations, why not stack allocate it per iteration, like (a, a^2, a^3) or immutable StaticArrays? That way you don’t have to worry about multiple threads mutating the same instance or reassigning the same variable. Accessors.jl can do some operations on immutable instances with mutation-like syntax, if that’ll help.

2 Likes

Hm, you may be right here, as I said I’m not really an experienced user of channels, so from my bad experiences with their performance I’ve mostly stayed away, but thinking about it more, it seems that this chunking should more or less help get the best of both.

Preliminary experiments indicate that my mental model was wrong:

$ julia --threads=1
julia> using LinearAlgebra

julia> tasks = map(1:2) do taskid
           Threads.@spawn begin
               A = randn(2000, 2000)
               for i in 1:5
                   eigen(A)  # kill time without implicitly yielding
                   println("Task $taskid iteration $i")
               end
           end
       end
       foreach(wait, tasks)
Task 1 iteration 1
Task 1 iteration 2
Task 1 iteration 3
Task 1 iteration 4
Task 1 iteration 5
Task 2 iteration 1
Task 2 iteration 2
Task 2 iteration 3
Task 2 iteration 4
Task 2 iteration 5

I thought more tasks than threads had essentially the same issues as more threads than cores, but perhaps not, at least not in julia 1.9.1.

Trying to understand the docs and this blogpost further: Announcing composable multi-threaded parallelism in Julia, it seems like my phrasing “ask the running tasks to yield” might not even make sense. A task is only ever switched out when it itself yields due to either calling yield() explicitly or hitting a blocking call that implicitly yields, like sleep, wait, et cetera. Also, a spawned task is only scheduled when a thread becomes available. At least that’s now my understanding. Still eager to hear from any experts on julia’s implementation of multithreading.

If this is correct, your original solution seems like the best, @Mason

2 Likes

This small package provides an easy interface for such task: GitHub - m3g/ChunkSplitters.jl: Simple chunk splitters for parallel loop executions

@joaquinpelle now, this is what your example would look like:

julia> using ChunkSplitters, Base.Threads

julia> A = rand(10000);

julia> A = rand(10000);

julia> nchunks = nthreads() # can be changed for tunning load balancing
12

julia> threads_cache = [zeros(3) for _ in 1:nchunks];

julia> @threads for (i_range, i_chunk) in chunks(eachindex(A), nchunks)
           for i in i_range 
               cache = threads_cache[i_chunk] # i_chunk index, not threadid()
               f!(cache, A[i])
               B[i] = g(cache, A[i])
           end
       end

(of course you should put the loop inside a function, possibly receiveing nchunks as an optional keyword argument set by default to nthreads()).

Also, if your cache is really of arrays of size 3, you should better just use tuples or static arrays, created on the flight by each thread, as mentioned by someone here.

3 Likes

Can somebody explain how the threadid() may change within the same iteration of a @threads for?
If, for example, there were a @spawn within the body of the loop, and the thread that executes this iteration follows the spawn while another thread steals the rest of the iteration?

Is that possible? Is it the only way it can happen so if there is no spawn within the loop body, you can assume the threadid() won’t change?

See here: Announcing composable multi-threaded parallelism in Julia, under the subheading “Task migration across system threads”. (Note that this blog post is 4 years old, so statements like “for now a task must always run on the thread it started running on” may no longer be true.)

If I understand this correctly, the only time a task can switch threads is when yielding/blocking, after which it may be restarted on a different thread. I don’t know what all the yielding/blocking operations are, so while I suppose a task is unlikely to ever migrate if it’s just doing straight number crunching that never waits for data, I’d rather err on the safe side.

One situation I’ve encountered where an otherwise purely computational task blocks is when writing/logging to a file during the computation. It’s essential to protect file access with a lock in order to preserve file integrity, and lock is certainly a blocking operation.

Nope, the tasks spawned by @threads are allowed to yield to other tasks, be paused, and then resume on a completely different thread in the middle of their execution. Even more problematic than the migration is that even for a specific fixed threadid, a task can yield between reading and writing to the cache and then end up overwriting the work another task did, so the code in the original post is incorrect even with a single thread.

In practice the migration or yielding will only happen if a yield point is in the task body, but even something as inoccuous as a hidden @debug statement or a dynamic dispatch or whatever could cause a yield-point, so you have no real control over whether the yielding behaviour can happen, and it’s best to just assume it can happen at any time.

2 Likes

ah, yeah I didn’t realize you had this confusion – there is no real performance problem with spawning too many tasks other than the fixed ~1 microsecond overhead of actually scheduling the task in the first place. So long as that overhead is negliglbe, there’s no risk of oversubscribing the CPU.

2 Likes

Thank you, everyone, for your responses. Thanks specially @Mason and @danielwe for your rich discussion was very instructive and helped me think about my design better, in addition to learning more about multithreading and even some Julia tricks. It seems @Mason’s solution is the most adequate, but I will profile @danielwe’s too.

1 Like

I hadn’t considered this option before. This could actually work because stack allocations are not expensive, right?

AFAIK every instance or reference to one in Julia is allocated on the stack or heap. Stack allocations and frees are deterministic and much cheaper than that for the heap; allocation reports don’t even bother counting it.