Pattern for managing thread local storage?

Depending on the run-time of the computation, I think it’s cleaner to distribute the item in data using a channel:

provider = Channel() do provider
    for x in data
        put!(provider, x)
    end
end

ntasks = Threads.nthreads()  # or some other positive integer
@sync for _ in 1:ntasks
    buffer = zeros(UInt8, 10^6)
    @spawn for x in provider
        f!(x, buffer)
    end
end

This makes the locality of buffer very apparent.

If provider channel becomes a bottleneck, you can send a chunk rather than an item:

provider = Channel() do provider
    basesize = some number
    for xs in Iterator.partition(data, basesize)
        put!(provider, xs)
    end
end

with for x in Iterators.flatten(provider) in the child task.

Alternatively, you can avoid the bottleneck comes with a channel by using my package FLoops.jl and use an accumulator as a local storage… kind of:

basesize = some positive number
@floop ThreadedEx(basesize = basesize) for x in data
    y = Some(x)
    @reduce() do (buffer = zeros(UInt8, 10^6); y)
        if y isa Some
            f!(something(y), buffer)
        end
    end
end

It would call zeros(UInt8, 10^6) only up to cld(length(data), basesize) times. But this is super ugly/tricky and I should provide a better syntax…

Let me to nitpick that this would delay @spawn of new tasks until available_ids is “refilled”. It doesn’t matter if the task itself takes a long time compared to the over head of take! and @spawn, though. Also, I’d put buffers directly in the channel for simplicity, if I were to go with the strategy in the OP.

4 Likes