Suppose we have a function, do_stuff!(mat, x), that mutates a previously allocated matrix mat and returns some value. We want to run the function for several values of x using multithreading. In order to avoid data races, we could do:
matrices = [zeros(2, 2) for n in 1:Threads.nthreads()]
n = 10
out = zeros(n)
xs = 1:n
Threads.@sync for (i, x) in enumerate(xs)
Threads.@spawn out[i] = do_stuff!(matrices[Threads.threadid()], x)
end
As far as I understand, task migration will be possible in Julia 1.7. So what will be the alternative to threadid() for 1.7 and above?
I am aware that FLoops.jl provides an @init macro that solves this problem. But since preallocating objects that are to be mutated is such a common pattern, I feel that there should be a solution for this problem in Base. But I am not aware of what that is.
To be completely safe maybe preallocating out for each thread as well.
But if you want the matrices to be dependent on the previous iteration maybe it is more transparent to use a parallel loop in the number of threads instead.
But if you want the matrices to be dependent on the previous iteration maybe it is more transparent to use a parallel loop in the number of threads instead.
To clarify: the matrices are not dependent on the previous iteration. The matrices are simply a “working space”. In the particular problem I am dealing (electronic bandstrucutre calculation), the do_stuff function builds a matrix as a function of x (the momentum in my problem). Instead of allocating a new matrix for each x I want to preallocate the matrices such that do_stuff! only modifies the matrix, instead of having to allocate a new one each time it is called.
My question is how this can be achieved with multithreading in a safe matter once we have task migration between threads.
To be completely safe maybe preallocating out for each thread as well.
Since each entry of out is guaranteed to only be touched by one thread, I think there is no need to preallocate an out for each thread.
To be honest, this is all way over my head. I just refer to the following warning by @tkf:
Furthermore, if julia supports migration of Task across OS threads at some future version, the above scenario can happen even if f never yields to the scheduler. Therefore, reduction or private state handling using threadid is very discouraged.
function do_stuff!(matrix, x)
matrix[1, 1] = a = rand()
yield() # or `@show`, or `@debug`, or write to a file, ...
matrix[1, 1] += x
@assert matrix[1, 1] == a + x # this *can* fail
end
The Julia runtime may decide to suspend a task executing do_stuff!(matrices[1], x) when it hits yield and starts a new task at thread 1, executing do_stuff!(matrices[1], x) again but with different x. Then, by the time the original task is restarted, matrix[1, 1] has a completely different value.
(For similar discussion, see Is `@async` memory safe? - #8 by tkf. This is about @async but task migration occurs at yield points and so it explains why matrices[Threads.threadid()] style does not work in general.)
So, the only way to use the matrices[Threads.threadid()] style is to make sure that there is no I/O (yield points) in all code reachable from do_stuff!. Sometimes it’s doable, but it does not scale well. In particular, do_stuff! cannot be generic over its argument anymore since you don’t know what methods of the functions are going to be used. This is why I’m against matrices[Threads.threadid()] style.
Though I think this and the original with threadid() both seem to be not memory safe, as the cache will be overwritten by spawning so many tasks (there will be multiple tasks on the same threadid() right?)
My favorite way to handle this is with Channels. Push tuples of (matrix[i],x) into a channel, and then spawn nthreads() tasks to pull from the channel and do the work.
That also imposes assumptions on how the threads are working. It is very likely that one thread finishes earlier, and then it will corrupt another thread’s data.
Maybe the answer to this question is just the note in the FAQ:
The problem discussed above can also be worked around by, e.g., using Threads.@threads for (since it spawns exactly nthreads() tasks and ensures that each task is scheduled on each OS thread, as of Julia 1.6) and making sure that state is not shared across multiple loops.
Hence, if I understand it correctly:
out = zeros(n)
xs = collect(1:n)
Threads.@threads for i in 1:n
out[i] = do_suff!(matrices[Threads.threadid()], xs[i])
end
But I’m not sure and also want to know the correct answer!
not really, in this case two threads may happen to be working on the same matrix at the same time.
Another problem is that solutions of that type depend on how the iterator and the indexes of the iterator are defined. What if it receives an offset array to iterate over?
Especially when things can take widely different amounts of time (like if do_stuff! is a stochastic simulation) and now that thread migration happens (in 1.7+) the pattern I’d use is:
work = Channel(Threads.nthreads())
res = Channel(Threads.nthreads())
for i in 1:Threads.nthreads()
Threads.@spawn do_things!(work,res)
end
for i in 1:n
put!(work,(matrices[i],x[i]))
end
while true
result = take!(res)
... store the result or break when special "end" message comes through
end
now do_things is a function that take! from the work channel, then do the stuff, and put the result to the results channel
ntasks = Thraeds.nthraeds() # use a larger values if `do_stuff!` contains I/O
@sync begin
workqueue = Channel{Tuple{Int,Float64}}(Inf)
# Do this after `@spawn`s if computing work itself is CPU-intensive
for (i, x) in enumerate(xs)
put!(workqueue, (i, x))
end
close(workqueue) # signal the end of work
for _ in 1:ntasks
@spawn begin
local matrix = zeros(2, 2) # allocate the buffer, and don't share
for (i, x) in workqueue
out[i] = do_stuff!(matrix, x)
end
end
end
end
If do_stuff! contains I/O like writing to a file or fetching something over internet, it makes sense to create ntasks larger than the number of CPUs. For example, Python’s concurrent.futures.ThreadPoolExecutor creates four more threads than the number of CPUs (and bound it by 32) assuming that the work is I/O-dominated.
If computing an element for workqueue also takes sometime, it makes sense to move for (i, x) in enumerate(xs) after for _ in 1:ntasks. However, it then increases contention on workqueue.
If each do_stuff!(matrix, x) is fast but xs is long, it’s better to chunk the works and use Channel{Vector{Tuple{Int,Float64}}}.
In my particular case, the do_stuff!(matrix, x) just fills the matrix with values that depend on x and then diagonalizes it. So in this case, the matrices[Threads.threadid()] style should be OK, correct?
Regarding
I noticed that, but I wondered what would be the way of doing it with @spawn. By the way, does anyone know if the matrices[Threads.threadid()] style will still work, once @threads uses dynamic scheduling, instead of static?
@tkf, thank you for the Base-only solution! One question: in the workqueue you include all the elements in xs. Then, inside the @spawn in the for (i, x) in workqueue loop, won’t all tasks work in all elements of xs? (sorry if this is a silly question, I don’t have any experience with Channel’s)
I think that would actually be best! I think having the ability to switch parallel execution, by simply changing an argument is wonderful!
Right, this is a tricky part of Channel. You can iterate over it but the output you “consumed” will be removed from it. So, no two tasks see the same elements.
Thanks for the detailed response @tkf. I guess I will have to read some more on Channel’s to properly understand this. In the mean time, I am grateful FLoops.jl exists!