Multithreading, preallocation of objects, threadid() and task migration

Suppose we have a function, do_stuff!(mat, x), that mutates a previously allocated matrix mat and returns some value. We want to run the function for several values of x using multithreading. In order to avoid data races, we could do:

matrices = [zeros(2, 2) for n in 1:Threads.nthreads()]
n = 10
out = zeros(n)
xs = 1:n

Threads.@sync for (i, x) in enumerate(xs)
    Threads.@spawn out[i] = do_stuff!(matrices[Threads.threadid()], x)

It has been pointed out by @tkf (check that this might no longer work, once task migration between threads is allowed.

As far as I understand, task migration will be possible in Julia 1.7. So what will be the alternative to threadid() for 1.7 and above?

I am aware that FLoops.jl provides an @init macro that solves this problem. But since preallocating objects that are to be mutated is such a common pattern, I feel that there should be a solution for this problem in Base. But I am not aware of what that is.

Thanks in advance!

1 Like

To be completely safe maybe preallocating out for each thread as well.

But if you want the matrices to be dependent on the previous iteration maybe it is more transparent to use a parallel loop in the number of threads instead.

Thanks for the feedback.

But if you want the matrices to be dependent on the previous iteration maybe it is more transparent to use a parallel loop in the number of threads instead.

To clarify: the matrices are not dependent on the previous iteration. The matrices are simply a “working space”. In the particular problem I am dealing (electronic bandstrucutre calculation), the do_stuff function builds a matrix as a function of x (the momentum in my problem). Instead of allocating a new matrix for each x I want to preallocate the matrices such that do_stuff! only modifies the matrix, instead of having to allocate a new one each time it is called.

My question is how this can be achieved with multithreading in a safe matter once we have task migration between threads.

To be completely safe maybe preallocating out for each thread as well.

Since each entry of out is guaranteed to only be touched by one thread, I think there is no need to preallocate an out for each thread.


In that case I do not see how task migration can affect the results. But I may be missing something, of course.

To be honest, this is all way over my head. I just refer to the following warning by @tkf:

Furthermore, if julia supports migration of Task across OS threads at some future version, the above scenario can happen even if f never yields to the scheduler. Therefore, reduction or private state handling using threadid is very discouraged.

That can be found in

Maybe it is as you say, and task migration plays no role in the particular problem I am concerned with. But I would like for someone to confirm this.


For example, consider:

function do_stuff!(matrix, x)
    matrix[1, 1] = a = rand()
    yield()  # or `@show`, or `@debug`, or write to a file, ...
    matrix[1, 1] += x
    @assert matrix[1, 1] == a + x  # this *can* fail

The Julia runtime may decide to suspend a task executing do_stuff!(matrices[1], x) when it hits yield and starts a new task at thread 1, executing do_stuff!(matrices[1], x) again but with different x. Then, by the time the original task is restarted, matrix[1, 1] has a completely different value.

(For similar discussion, see Is `@async` memory safe? - #8 by tkf. This is about @async but task migration occurs at yield points and so it explains why matrices[Threads.threadid()] style does not work in general.)

So, the only way to use the matrices[Threads.threadid()] style is to make sure that there is no I/O (yield points) in all code reachable from do_stuff!. Sometimes it’s doable, but it does not scale well. In particular, do_stuff! cannot be generic over its argument anymore since you don’t know what methods of the functions are going to be used. This is why I’m against matrices[Threads.threadid()] style.

Yeah, like putting FLoops in stdlib? :troll:


am I missing something or wouldn’t it just be:

Threads.@sync for (i, x) in enumerate(xs)
    Threads.@spawn out[i] = do_stuff!(matrices[i], x)

There might me much less threads than elements in xs. Hence, having length(xs) many cache matrices is probably not the goal…

ah, I understand. how about i % nthreads() ?

Though I think this and the original with threadid() both seem to be not memory safe, as the cache will be overwritten by spawning so many tasks (there will be multiple tasks on the same threadid() right?)

My favorite way to handle this is with Channels. Push tuples of (matrix[i],x) into a channel, and then spawn nthreads() tasks to pull from the channel and do the work.

That also imposes assumptions on how the threads are working. It is very likely that one thread finishes earlier, and then it will corrupt another thread’s data.

right, but that problem also exists in the original code :wink: I think.

Maybe the answer to this question is just the note in the FAQ:

The problem discussed above can also be worked around by, e.g., using Threads.@threads for (since it spawns exactly nthreads() tasks and ensures that each task is scheduled on each OS thread, as of Julia 1.6) and making sure that state is not shared across multiple loops.

Hence, if I understand it correctly:

out = zeros(n)
xs = collect(1:n)
Threads.@threads for i in 1:n
    out[i] = do_suff!(matrices[Threads.threadid()], xs[i])

But I’m not sure and also want to know the correct answer! :slight_smile:

not really, in this case two threads may happen to be working on the same matrix at the same time.

Another problem is that solutions of that type depend on how the iterator and the indexes of the iterator are defined. What if it receives an offset array to iterate over?

Especially when things can take widely different amounts of time (like if do_stuff! is a stochastic simulation) and now that thread migration happens (in 1.7+) the pattern I’d use is:

work = Channel(Threads.nthreads())
res = Channel(Threads.nthreads())

for i in 1:Threads.nthreads()
   Threads.@spawn do_things!(work,res)

for i in 1:n

while true
   result = take!(res)
   ... store the result or break when special "end" message comes through

now do_things is a function that take! from the work channel, then do the stuff, and put the result to the results channel


If you are going to use channels, I suggest keeping resources (e.g., pre-allocated matrix) as local variables as I demonstrated here: Pattern for managing thread local storage? - #2 by tkf

For more patterns like this, see Concurrency patterns for controlled parallelisms (discussion)

So, a Base-only pattern to do this may be

ntasks = Thraeds.nthraeds()  # use a larger values if `do_stuff!` contains I/O
@sync begin
    workqueue = Channel{Tuple{Int,Float64}}(Inf)

    # Do this after `@spawn`s if computing work itself is CPU-intensive
    for (i, x) in enumerate(xs)
        put!(workqueue, (i, x))
    close(workqueue)  # signal the end of work

    for _ in 1:ntasks
        @spawn begin
            local matrix = zeros(2, 2)  # allocate the buffer, and don't share
            for (i, x) in workqueue
                out[i] = do_stuff!(matrix, x)
  • If do_stuff! contains I/O like writing to a file or fetching something over internet, it makes sense to create ntasks larger than the number of CPUs. For example, Python’s concurrent.futures.ThreadPoolExecutor creates four more threads than the number of CPUs (and bound it by 32) assuming that the work is I/O-dominated.
  • If computing an element for workqueue also takes sometime, it makes sense to move for (i, x) in enumerate(xs) after for _ in 1:ntasks. However, it then increases contention on workqueue.
  • If each do_stuff!(matrix, x) is fast but xs is long, it’s better to chunk the works and use Channel{Vector{Tuple{Int,Float64}}}.

Thank you for all the comments.

In my particular case, the do_stuff!(matrix, x) just fills the matrix with values that depend on x and then diagonalizes it. So in this case, the matrices[Threads.threadid()] style should be OK, correct?


I noticed that, but I wondered what would be the way of doing it with @spawn. By the way, does anyone know if the matrices[Threads.threadid()] style will still work, once @threads uses dynamic scheduling, instead of static?

@tkf, thank you for the Base-only solution! One question: in the workqueue you include all the elements in xs. Then, inside the @spawn in the for (i, x) in workqueue loop, won’t all tasks work in all elements of xs? (sorry if this is a silly question, I don’t have any experience with Channel's)

I think that would actually be best! I think having the ability to switch parallel execution, by simply changing an argument is wonderful!

Right, this is a tricky part of Channel. You can iterate over it but the output you “consumed” will be removed from it. So, no two tasks see the same elements.

Here’s a demo

julia> ch = foldl(push!, 1:1_000_000; init = Channel(Inf)) # fill channel
       t1 = Threads.@spawn collect(ch)  # `collect` loops over `ch`
       t2 = Threads.@spawn collect(ch);

julia> fetch(t1)
863030-element Vector{Any}:

julia> fetch(t2)
136970-element Vector{Any}:

julia> sort!([fetch(t1); fetch(t2)]) == 1:1_000_000

As you can see, the output of t1 and t2 contain distinct elements from 1:1_000_000.

1 Like

Thanks for the detailed response @tkf. I guess I will have to read some more on Channel's to properly understand this. In the mean time, I am grateful FLoops.jl exists!

1 Like