Multithreading, preallocation of objects, threadid() and task migration

Suppose we have a function, do_stuff!(mat, x), that mutates a previously allocated matrix mat and returns some value. We want to run the function for several values of x using multithreading. In order to avoid data races, we could do:

matrices = [zeros(2, 2) for n in 1:Threads.nthreads()]
n = 10
out = zeros(n)
xs = 1:n

Threads.@sync for (i, x) in enumerate(xs)
    Threads.@spawn out[i] = do_stuff!(matrices[Threads.threadid()], x)
end

It has been pointed out by @tkf (check https://juliafolds.github.io/FLoops.jl/dev/explanation/faq/#faq-state-threadid) that this might no longer work, once task migration between threads is allowed.

As far as I understand, task migration will be possible in Julia 1.7. So what will be the alternative to threadid() for 1.7 and above?

I am aware that FLoops.jl provides an @init macro that solves this problem. But since preallocating objects that are to be mutated is such a common pattern, I feel that there should be a solution for this problem in Base. But I am not aware of what that is.

Thanks in advance!

1 Like

To be completely safe maybe preallocating out for each thread as well.

But if you want the matrices to be dependent on the previous iteration maybe it is more transparent to use a parallel loop in the number of threads instead.

Thanks for the feedback.

But if you want the matrices to be dependent on the previous iteration maybe it is more transparent to use a parallel loop in the number of threads instead.

To clarify: the matrices are not dependent on the previous iteration. The matrices are simply a “working space”. In the particular problem I am dealing (electronic bandstrucutre calculation), the do_stuff function builds a matrix as a function of x (the momentum in my problem). Instead of allocating a new matrix for each x I want to preallocate the matrices such that do_stuff! only modifies the matrix, instead of having to allocate a new one each time it is called.

My question is how this can be achieved with multithreading in a safe matter once we have task migration between threads.

To be completely safe maybe preallocating out for each thread as well.

Since each entry of out is guaranteed to only be touched by one thread, I think there is no need to preallocate an out for each thread.

2 Likes

In that case I do not see how task migration can affect the results. But I may be missing something, of course.

To be honest, this is all way over my head. I just refer to the following warning by @tkf:

Furthermore, if julia supports migration of Task across OS threads at some future version, the above scenario can happen even if f never yields to the scheduler. Therefore, reduction or private state handling using threadid is very discouraged.

That can be found in https://juliafolds.github.io/FLoops.jl/dev/explanation/faq/#faq-state-threadid.

Maybe it is as you say, and task migration plays no role in the particular problem I am concerned with. But I would like for someone to confirm this.

2 Likes

For example, consider:

function do_stuff!(matrix, x)
    matrix[1, 1] = a = rand()
    yield()  # or `@show`, or `@debug`, or write to a file, ...
    matrix[1, 1] += x
    @assert matrix[1, 1] == a + x  # this *can* fail
end

The Julia runtime may decide to suspend a task executing do_stuff!(matrices[1], x) when it hits yield and starts a new task at thread 1, executing do_stuff!(matrices[1], x) again but with different x. Then, by the time the original task is restarted, matrix[1, 1] has a completely different value.

(For similar discussion, see Is `@async` memory safe? - #8 by tkf. This is about @async but task migration occurs at yield points and so it explains why matrices[Threads.threadid()] style does not work in general.)

So, the only way to use the matrices[Threads.threadid()] style is to make sure that there is no I/O (yield points) in all code reachable from do_stuff!. Sometimes it’s doable, but it does not scale well. In particular, do_stuff! cannot be generic over its argument anymore since you don’t know what methods of the functions are going to be used. This is why I’m against matrices[Threads.threadid()] style.

Yeah, like putting FLoops in stdlib? :troll:

3 Likes

am I missing something or wouldn’t it just be:

Threads.@sync for (i, x) in enumerate(xs)
    Threads.@spawn out[i] = do_stuff!(matrices[i], x)
end

There might me much less threads than elements in xs. Hence, having length(xs) many cache matrices is probably not the goal…

ah, I understand. how about i % nthreads() ?

Though I think this and the original with threadid() both seem to be not memory safe, as the cache will be overwritten by spawning so many tasks (there will be multiple tasks on the same threadid() right?)

My favorite way to handle this is with Channels. Push tuples of (matrix[i],x) into a channel, and then spawn nthreads() tasks to pull from the channel and do the work.

That also imposes assumptions on how the threads are working. It is very likely that one thread finishes earlier, and then it will corrupt another thread’s data.

right, but that problem also exists in the original code :wink: I think.

Maybe the answer to this question is just the note in the FAQ:

The problem discussed above can also be worked around by, e.g., using Threads.@threads for (since it spawns exactly nthreads() tasks and ensures that each task is scheduled on each OS thread, as of Julia 1.6) and making sure that state is not shared across multiple loops.

Hence, if I understand it correctly:

out = zeros(n)
xs = collect(1:n)
Threads.@threads for i in 1:n
    out[i] = do_suff!(matrices[Threads.threadid()], xs[i])
end

But I’m not sure and also want to know the correct answer! :slight_smile:

not really, in this case two threads may happen to be working on the same matrix at the same time.

Another problem is that solutions of that type depend on how the iterator and the indexes of the iterator are defined. What if it receives an offset array to iterate over?

Especially when things can take widely different amounts of time (like if do_stuff! is a stochastic simulation) and now that thread migration happens (in 1.7+) the pattern I’d use is:

work = Channel(Threads.nthreads())
res = Channel(Threads.nthreads())

for i in 1:Threads.nthreads()
   Threads.@spawn do_things!(work,res)
end

for i in 1:n
   put!(work,(matrices[i],x[i]))
end

while true
   result = take!(res)
   ... store the result or break when special "end" message comes through
end

now do_things is a function that take! from the work channel, then do the stuff, and put the result to the results channel

2 Likes

If you are going to use channels, I suggest keeping resources (e.g., pre-allocated matrix) as local variables as I demonstrated here: Pattern for managing thread local storage? - #2 by tkf

For more patterns like this, see Concurrency patterns for controlled parallelisms (discussion)

So, a Base-only pattern to do this may be

ntasks = Thraeds.nthraeds()  # use a larger values if `do_stuff!` contains I/O
@sync begin
    workqueue = Channel{Tuple{Int,Float64}}(Inf)

    # Do this after `@spawn`s if computing work itself is CPU-intensive
    for (i, x) in enumerate(xs)
        put!(workqueue, (i, x))
    end
    close(workqueue)  # signal the end of work

    for _ in 1:ntasks
        @spawn begin
            local matrix = zeros(2, 2)  # allocate the buffer, and don't share
            for (i, x) in workqueue
                out[i] = do_stuff!(matrix, x)
            end
        end
    end
end
  • If do_stuff! contains I/O like writing to a file or fetching something over internet, it makes sense to create ntasks larger than the number of CPUs. For example, Python’s concurrent.futures.ThreadPoolExecutor creates four more threads than the number of CPUs (and bound it by 32) assuming that the work is I/O-dominated.
  • If computing an element for workqueue also takes sometime, it makes sense to move for (i, x) in enumerate(xs) after for _ in 1:ntasks. However, it then increases contention on workqueue.
  • If each do_stuff!(matrix, x) is fast but xs is long, it’s better to chunk the works and use Channel{Vector{Tuple{Int,Float64}}}.
4 Likes

Thank you for all the comments.

In my particular case, the do_stuff!(matrix, x) just fills the matrix with values that depend on x and then diagonalizes it. So in this case, the matrices[Threads.threadid()] style should be OK, correct?

Regarding

I noticed that, but I wondered what would be the way of doing it with @spawn. By the way, does anyone know if the matrices[Threads.threadid()] style will still work, once @threads uses dynamic scheduling, instead of static?

@tkf, thank you for the Base-only solution! One question: in the workqueue you include all the elements in xs. Then, inside the @spawn in the for (i, x) in workqueue loop, won’t all tasks work in all elements of xs? (sorry if this is a silly question, I don’t have any experience with Channel’s)

I think that would actually be best! I think having the ability to switch parallel execution, by simply changing an argument is wonderful!

Right, this is a tricky part of Channel. You can iterate over it but the output you “consumed” will be removed from it. So, no two tasks see the same elements.

Here’s a demo

julia> ch = foldl(push!, 1:1_000_000; init = Channel(Inf)) # fill channel
       close(ch)
       t1 = Threads.@spawn collect(ch)  # `collect` loops over `ch`
       t2 = Threads.@spawn collect(ch);

julia> fetch(t1)
863030-element Vector{Any}:
       1
       2
       3
       4
       ⋮
  999996
  999997
  999998
 1000000

julia> fetch(t2)
136970-element Vector{Any}:
     29
     31
     32
    962
      ⋮
 999973
 999974
 999975
 999999

julia> sort!([fetch(t1); fetch(t2)]) == 1:1_000_000
true

As you can see, the output of t1 and t2 contain distinct elements from 1:1_000_000.

1 Like

Thanks for the detailed response @tkf. I guess I will have to read some more on Channel’s to properly understand this. In the mean time, I am grateful FLoops.jl exists!

1 Like