I’m trying to make a producer-consumer threaded program, but can’t figure out why yield
seems not to be ceding control to worker tasks most of the time.
Simplified Example
I’ve created a much simpler example that replicates the problem. The overall algorithm of this toy problem is:
update!(A) = A .+= 1
x = zeros(100000000)
for i in 1:10
update!(x)
end
To do this in parallel, I spawn worker tasks that repeatedly pull “chunks” of the array from a shared queue and update them, with the update function putting work into the queue and waiting for the workers to finish.
I created an array class with an extra member to allow this to happen without a race condition:
mutable struct ThreadVec <: AbstractVector{Float64}
A::Vector{Float64}
@atomic remainingwork::Int64
function ThreadVec(N::Integer)
return new(zeros(N), 0)
end
end
Base.size(tv::ThreadVec) = size(tv.A)
Base.getindex(tv::ThreadVec, i) = getindex(tv.A, i)
Base.setindex!(tv::ThreadVec, v, i) = setindex!(tv.A, v, i)
The extra member, remainingwork
, indicates how many “chunks” of the array have yet to be updated during a given iteration–it is incremented when a chunk is put onto the queue, and decremented when a chunk has been pulled off and updated.
The worker tasks wait for work (in the form of a ThreadVec
and the indices between which to update) to get put onto a Channel
, increment each index of the ThreadVec
between these indices, and go back to waiting:
function spawnworkers()
# The work queue
workqueue = Channel{Tuple{ThreadVec,Int64,Int64}}(Inf)
# Which threads did how much work?
workdoneperthread = [Threads.Atomic{Int64}(0) for _ in 1:Threads.nthreads()]
for _ in 1:Threads.nthreads()
Threads.@spawn begin
while true
try
# Pull work from queue
tv, start, finish = take!(workqueue)
# "Update" this section of tuv
for i in start:finish
tv[i] += 1
end
# Decrement tv's work counter
@atomic tv.remainingwork -= 1
# Track which thread this task is executing on
workdoneperthread[Threads.threadid()][] += 1
catch e
# Exit gracefully-ish when the channel gets closed
e isa InvalidStateException && e.state === :closed && break
# Otherwise throw
rethrow(e)
end
end
end
end
return workqueue, workdoneperthread
end
In addition to workqueue
, spawnthreads
returns workdoneperthread
, which tracks how many times each thread happened to be running a worker task after a chunk was updated.
The actual update function is pretty simple–it puts work onto the queue and waits (using yield
to hopefully be productive in the meantime) until all chunks have been updated:
function update!(tv::ThreadVec, workqueue::Channel{Tuple{ThreadVec,Int64,Int64}})
dim = length(tv)
chunksize::Int = ceil(dim / Threads.nthreads() / log(dim))
for i=1:chunksize:dim
@atomic tv.remainingwork += 1
put!(workqueue, (tv, i, min(i+chunksize, dim)))
end
while tv.remainingwork > 0
yield()
end
end
The main
I’ve been testing with spawns worker threads, creates a ThreadVec
, and updates it 10 times:
function main()
# Create a vector for updating
tv = ThreadVec(Int(1e8))
# Spawn workers
workqueue, workdoneperthread = spawnworkers()
# Update 10 times
t1 = time()
for _ in 1:10
update!(tv, workqueue)
end
t2 = time()
# Clean up
close(workqueue) # should trigger threads to finish
# Print results
totalwork = sum(x->x[], workdoneperthread)
println("Time elapsed: ", t2 - t1)
println("Share of actual computation done by each thread:")
for (tid, work) in enumerate(workdoneperthread)
println(" Thread $tid: $(100 * work[] / totalwork |> round |> Int)%")
end
end
The results of running main()
look like:
Time elapsed: 0.7464809417724609
Share of actual computation done by each thread:
Thread 1: 6%
Thread 2: 29%
Thread 3: 30%
Thread 4: 35%
The Problem
Thread 1 rarely does much work. As you might expect, this is particularly problematic with 2 threads. The amount of time on average (10 runs) it takes to update a 100 million-element ThreadVec
10 times with 1, 2, and 4 threads, along with the average amount of work done by thread 1, is shown in this table:
Threads | Time (s) | Work by thread 1 |
---|---|---|
1 | 1.84 | 100% |
2 | 2.43 | 5.3% |
4 | 0.76 | 5.4% |
This agrees with @profview
, where I consistently see yield()
taking around 40% of the program time when run with 2 threads. I don’t think this is unique to this particular problem–changing the size of tv
, changing chunksize
, and changing the toy algorithm itself have no discernible effect, and the results are similar to those of the non-toy algorithm which is more computationally intensive. I also don’t think it’s a fluke unique to this computer or this run–on a laptop, a desktop, and a beefy server, and on different days given the same computer, the results have been consistent.
How should I fix this performance issue? The easiest way would be to replace the yield
in the waiting while
loop in update!
with a non-blocking take!
, but unfortunately that doesn’t (yet?) exist. I’m open to other ideas for organization, but keep in mind this is just a toy example–I’m looking for some form of producer-consumer, not Threads.@threads for ...
or similar.