Pattern for managing thread local storage?

What’s a good pattern for preallocating buffers in a multithreaded setting? I want to avoid allocating new memory whenever I launch a task on a thread.

So far I’ve been using the following pattern, which maintains a pool of ids which are used to index into a collection of buffers. In retrospect, this would also work for an asynchronous (rather than multithreaded) setting, with t being the maximum number of tasks running at a time. Below, the goal is to compute some function f(x, buffer) that relies on a preallocated buffer.

t = Threads.nthreads()

# local buffers
buffers = [zeros(UInt8, 10^6) for _ in 1:t]

# queue of ids
available_ids = Channel{Int}(t)
for i in 1:t
    put!(available_ids, i)
end

@sync for x in data
    # grab an id from the queue
    # this doesn't need to match the value returned by Threads.threadid()
    id = take!(available_ids)

    Threads.@spawn begin
        # get local buffer
        buffer = buffers[id]

        # compute some complicated function f(x, buffer)
        # that uses a preallocated buffer
        # ...

        # return id to queue
        put!(available_ids, id)
    end
end

Is there a better or more standard way of doing this?

3 Likes

Depending on the run-time of the computation, I think it’s cleaner to distribute the item in data using a channel:

provider = Channel() do provider
    for x in data
        put!(provider, x)
    end
end

ntasks = Threads.nthreads()  # or some other positive integer
@sync for _ in 1:ntasks
    buffer = zeros(UInt8, 10^6)
    @spawn for x in provider
        f!(x, buffer)
    end
end

This makes the locality of buffer very apparent.

If provider channel becomes a bottleneck, you can send a chunk rather than an item:

provider = Channel() do provider
    basesize = some number
    for xs in Iterator.partition(data, basesize)
        put!(provider, xs)
    end
end

with for x in Iterators.flatten(provider) in the child task.

Alternatively, you can avoid the bottleneck comes with a channel by using my package FLoops.jl and use an accumulator as a local storage… kind of:

basesize = some positive number
@floop ThreadedEx(basesize = basesize) for x in data
    y = Some(x)
    @reduce() do (buffer = zeros(UInt8, 10^6); y)
        if y isa Some
            f!(something(y), buffer)
        end
    end
end

It would call zeros(UInt8, 10^6) only up to cld(length(data), basesize) times. But this is super ugly/tricky and I should provide a better syntax…

Let me to nitpick that this would delay @spawn of new tasks until available_ids is “refilled”. It doesn’t matter if the task itself takes a long time compared to the over head of take! and @spawn, though. Also, I’d put buffers directly in the channel for simplicity, if I were to go with the strategy in the OP.

3 Likes

Depending on the run-time of the computation, I think it’s cleaner to distribute the item in data using a channel

Also, I’d put buffers directly in the channel for simplicity, if I were to go with the strategy in the OP.

Nice approach, I will try this, thanks! For my use case, I’m reading into a thread-local buffer from an IOStream, followed by a slow computation on each buffer (after releasing the IOStream lock so other threads can start) that itself requires another thread-local buffer. I can’t anticipate the total number of reads, but I can estimate the maximum useful number of threads: the buffer processing “slow” step takes about 10X longer than the read step (both scale linearly in chunk length), so it won’t be useful to have much more than 10 threads running.

@floop and @reduce might be a good fit, thanks for the pointer.

I have had a similar problem where I had most of the CPU kept busy reallocating a large matrix at each function call where it was required. I have opted for a pre-allocated matrix kept in a ‘context’ global structure passed around. However, this clearly goes in the way of, at some future point, multithreading the code.
I wanted to use a similar approach as described, but with the addition of a return channel whereby, after “consumption”, the pre-allocated matrix would be returned to a producer-maintained pool.
My experience in Julia multithreading is exactly === Nothing. Has any of you somewhere to point me towards to find inspiration?

Thanks!

There are a couple of solutions discussed in Thread safe buffer correctness

:+1:t4: