Hi there, I have a loop that uses a cache array to avoid excessive memory allocations. The structure is captured in the following example:
A = rand(10000)
B = similar(A)
cache = zeros(3)
for i in eachindex(A)
f!(cache, A[i])
B[i] = g(cache, A[i])
end
Only the in-place nature of f! is relevant and not its details, but for the sake of having a minimal working example, let me define
function f!(cache, a)
cache[1] = a
cache[2] = a^2
cache[3] = a^3
end
function g(cache, a)
return a*sum(cache.^2)
end
In my real example, the loop takes a long time, so I would like to parallelize it using multithreading. Initially, I thought the following simple modification where I create a cache for each thread could work:
A = rand(10000)
B = similar(A)
threads_cache = [zeros(3) for i in 1:Threads.nthreads()]
@threads for i in eachindex(A)
cache = threads_cache[Threads.threadid()]
f!(cache, A[i])
B[i] = g(cache, A[i])
end
However, the @threadsdocumentation states that one should only write to locations not shared across iterations and that the value of threadid() may change even within the same iteration. So the example above would be incorrect since each cache is shared across iterations having the same threadid(). Creating a cache for each iteration would defeat the purpose of avoiding excessive memory allocations. So, is there a way to use multithreading while still benefitting from the in-place nature of the computation? Can I prevent different iterations to run concurrently in the same thread? Do I need to introduce locks or atomic operations to this example?
One solution is to manually spawn just one task per thread (or, rather, per preallocated cache) and have each task handle as many values of i as necessary. The easiest way to distribute the is to tasks is to take them dynamically from a prepopulated channel, as follows:
is = eachindex(A)
ch = Channel{eltype(is)}(Inf)
foreach(i -> put!(ch, i), is)
close(ch)
@sync for cache in threads_cache
Threads.@spawn for i in ch
f!(cache, A[i])
B[i] = g(cache, A[i])
end
end
Thereās of course a bit of overhead in this 100 % dynamic scheduling since iterating the channel involves locking under the hood, so if your loop body is super cheap you might want to pre-assign is to tasks instead.
You have to use the :static scheduler to make this work:
A = rand(10000)
B = similar(A)
threads_cache = [zeros(3) for i in 1:Threads.nthreads()]
@threads :static for i in eachindex(A)
cache = threads_cache[Threads.threadid()]
f!(cache, A[i])
B[i] = g(cache, A[i])
end
See ??Threads.@threadsin the REPL for the details.
Here is IMO the best way to approach this problem if you donāt want to use external libraries:
A = rand(10000)
B = similar(A)
# Break your work into chunks so that there's enough for each thread to get two chunks.
# More chunks per thread has better load balancing but more overhead
chunks_per_thread = 2
chunks = Iterators.partition(eachindex(A), length(A) Ć· (Threads.nthreads() * chunks_per_thread))
# map over the chunks, creating an array of spawned tasks
tasks = map(chunks) do chunk
# For each chunk we spawn a task that creates some state, and then does your
# single-threaded algorithm on that state, and the chunk it was given, then
# returns the state it created at the end
Threads.@spawn begin
cache = zeros(3)
for i ā chunk
f!(cache, A[i])
B[i] = g(cache, A[i])
end
return cache
end
end
# Now we fetch all the results from the spawned tasks
caches = fetch.(tasks)
Instead of creating state and then spawning parallel work that observes that state, you should instead think in terms of the parallel workers creating and returning their own state.
While this will work, I recommend against it because the static scheduler is not composable with other multithreaded code. Weāre moving away from it for a good reason.
Thanks for this! I wasnāt aware of Iterators.partition.
I suppose if you want the load balancing of multiple chunks per thread without having to spawn more tasks than threads, you could combine our approaches:
chunks_per_thread = 2
chunks = Iterators.partition(eachindex(A), length(A) Ć· (Threads.nthreads() * chunks_per_thread))
ch = Channel{eltype(chunks)}(Inf)
foreach(chunk -> put!(ch, chunk), chunks)
close(ch)
tasks = map(1:Threads.nthreads()) do _
Threads.@spawn begin
cache = zeros(3)
for chunk in ch
for i in chunk
f!(cache, A[i])
B[i] = g(cache, A[i])
end
end
return cache
end
end
caches = fetch.(tasks)
For long-running compute-bound tasks, it seems plausible that the overhead from the occasional lock when taking the next chunk from the channel would be less than the overhead from tasks stepping on each other due to there being more tasks than threads. Of course, one should profile to know for sure.
I think if youāre worried about the overhead of spawning tasks, then you should generally be even more worried about the overhead of using channels. Iām not an experienced user of channels, and I havenāt tried everything, but Iām not sure Iāve ever found a workload in my work that benefited from using channels rather than just spawning more tasks (this is probably because Iām almost always compute limited but Iām not sure).
Arenāt compute-limited tasks exactly the ones for which having more tasks than threads is detrimental? Itās not the spawning of tasks Iām worried about, but the fact that they will be competing for resources during execution. The scheduler will ask the running tasks to yield at regular intervals to distribute processing time fairly, (probably incorrect and not even meaningfully phrased, see below) when they could have been plowing through sequentially and without interruption at 100 % CPU. Itās like having multiple async tasks on a single threadāuseful when youāre IO bound, waiting for user input or network resources etc., but not a way to run heavy computations.
The way I used the channel in my first example is absolutely not ideal, as each task has to acquire the lock for every single iteration. However, when combined with your manual chunking to reduce the number of channel accesses down to only a handful per task, almost all the overhead should be gone. Acquiring the lock, say, every few minutes shouldnāt be anything to worry about.
Whatās important if youāre concerned about performance is to make sure the channel is typed (Channel{eltype(container)}) and that you either wrap a function around the code or make the channel const, such that the element type can be inferred within the task.
Itās probably just because the MWE is much smaller, but is a shared cache really necessary here? The cache is overwritten every iteration, so if you donāt need to store the data from past iterations, why not stack allocate it per iteration, like (a, a^2, a^3) or immutable StaticArrays? That way you donāt have to worry about multiple threads mutating the same instance or reassigning the same variable. Accessors.jl can do some operations on immutable instances with mutation-like syntax, if thatāll help.
Hm, you may be right here, as I said Iām not really an experienced user of channels, so from my bad experiences with their performance Iāve mostly stayed away, but thinking about it more, it seems that this chunking should more or less help get the best of both.
Preliminary experiments indicate that my mental model was wrong:
$ julia --threads=1
julia> using LinearAlgebra
julia> tasks = map(1:2) do taskid
Threads.@spawn begin
A = randn(2000, 2000)
for i in 1:5
eigen(A) # kill time without implicitly yielding
println("Task $taskid iteration $i")
end
end
end
foreach(wait, tasks)
Task 1 iteration 1
Task 1 iteration 2
Task 1 iteration 3
Task 1 iteration 4
Task 1 iteration 5
Task 2 iteration 1
Task 2 iteration 2
Task 2 iteration 3
Task 2 iteration 4
Task 2 iteration 5
I thought more tasks than threads had essentially the same issues as more threads than cores, but perhaps not, at least not in julia 1.9.1.
Trying to understand the docs and this blogpost further: Announcing composable multi-threaded parallelism in Julia, it seems like my phrasing āask the running tasks to yieldā might not even make sense. A task is only ever switched out when it itself yields due to either calling yield() explicitly or hitting a blocking call that implicitly yields, like sleep, wait, et cetera. Also, a spawned task is only scheduled when a thread becomes available. At least thatās now my understanding. Still eager to hear from any experts on juliaās implementation of multithreading.
If this is correct, your original solution seems like the best, @Mason
@joaquinpelle now, this is what your example would look like:
julia> using ChunkSplitters, Base.Threads
julia> A = rand(10000);
julia> A = rand(10000);
julia> nchunks = nthreads() # can be changed for tunning load balancing
12
julia> threads_cache = [zeros(3) for _ in 1:nchunks];
julia> @threads for (i_range, i_chunk) in chunks(eachindex(A), nchunks)
for i in i_range
cache = threads_cache[i_chunk] # i_chunk index, not threadid()
f!(cache, A[i])
B[i] = g(cache, A[i])
end
end
(of course you should put the loop inside a function, possibly receiveing nchunks as an optional keyword argument set by default to nthreads()).
Also, if your cache is really of arrays of size 3, you should better just use tuples or static arrays, created on the flight by each thread, as mentioned by someone here.
Can somebody explain how the threadid() may change within the same iteration of a @threads for?
If, for example, there were a @spawn within the body of the loop, and the thread that executes this iteration follows the spawn while another thread steals the rest of the iteration?
Is that possible? Is it the only way it can happen so if there is no spawn within the loop body, you can assume the threadid() wonāt change?
See here: Announcing composable multi-threaded parallelism in Julia, under the subheading āTask migration across system threadsā. (Note that this blog post is 4 years old, so statements like āfor now a task must always run on the thread it started running onā may no longer be true.)
If I understand this correctly, the only time a task can switch threads is when yielding/blocking, after which it may be restarted on a different thread. I donāt know what all the yielding/blocking operations are, so while I suppose a task is unlikely to ever migrate if itās just doing straight number crunching that never waits for data, Iād rather err on the safe side.
One situation Iāve encountered where an otherwise purely computational task blocks is when writing/logging to a file during the computation. Itās essential to protect file access with a lock in order to preserve file integrity, and lock is certainly a blocking operation.
Nope, the tasks spawned by @threads are allowed to yield to other tasks, be paused, and then resume on a completely different thread in the middle of their execution. Even more problematic than the migration is that even for a specific fixed threadid, a task can yield between reading and writing to the cache and then end up overwriting the work another task did, so the code in the original post is incorrect even with a single thread.
In practice the migration or yielding will only happen if a yield point is in the task body, but even something as inoccuous as a hidden @debug statement or a dynamic dispatch or whatever could cause a yield-point, so you have no real control over whether the yielding behaviour can happen, and itās best to just assume it can happen at any time.
ah, yeah I didnāt realize you had this confusion ā there is no real performance problem with spawning too many tasks other than the fixed ~1 microsecond overhead of actually scheduling the task in the first place. So long as that overhead is negliglbe, thereās no risk of oversubscribing the CPU.
Thank you, everyone, for your responses. Thanks specially @Mason and @danielwe for your rich discussion was very instructive and helped me think about my design better, in addition to learning more about multithreading and even some Julia tricks. It seems @Masonās solution is the most adequate, but I will profile @danielweās too.
AFAIK every instance or reference to one in Julia is allocated on the stack or heap. Stack allocations and frees are deterministic and much cheaper than that for the heap; allocation reports donāt even bother counting it.