Yield() not behaving as expected

I’m trying to make a producer-consumer threaded program, but can’t figure out why yield seems not to be ceding control to worker tasks most of the time.

Simplified Example

I’ve created a much simpler example that replicates the problem. The overall algorithm of this toy problem is:

update!(A) = A .+= 1
x = zeros(100000000)
for i in 1:10
    update!(x)
end

To do this in parallel, I spawn worker tasks that repeatedly pull “chunks” of the array from a shared queue and update them, with the update function putting work into the queue and waiting for the workers to finish.

I created an array class with an extra member to allow this to happen without a race condition:

mutable struct ThreadVec <: AbstractVector{Float64}
    A::Vector{Float64}
    @atomic remainingwork::Int64

    function ThreadVec(N::Integer)
        return new(zeros(N), 0)
    end
end

Base.size(tv::ThreadVec) = size(tv.A)
Base.getindex(tv::ThreadVec, i) = getindex(tv.A, i)
Base.setindex!(tv::ThreadVec, v, i) = setindex!(tv.A, v, i)

The extra member, remainingwork, indicates how many “chunks” of the array have yet to be updated during a given iteration–it is incremented when a chunk is put onto the queue, and decremented when a chunk has been pulled off and updated.

The worker tasks wait for work (in the form of a ThreadVec and the indices between which to update) to get put onto a Channel, increment each index of the ThreadVec between these indices, and go back to waiting:

function spawnworkers()
    # The work queue
    workqueue = Channel{Tuple{ThreadVec,Int64,Int64}}(Inf)
    # Which threads did how much work?
    workdoneperthread = [Threads.Atomic{Int64}(0) for _ in 1:Threads.nthreads()]
    for _ in 1:Threads.nthreads()
        Threads.@spawn begin
            while true
                try
                    # Pull work from queue
                    tv, start, finish = take!(workqueue)
                    # "Update" this section of tuv
                    for i in start:finish
                        tv[i] += 1
                    end
                    # Decrement tv's work counter
                    @atomic tv.remainingwork -= 1
                    # Track which thread this task is executing on
                    workdoneperthread[Threads.threadid()][] += 1
                catch e
                    # Exit gracefully-ish when the channel gets closed
                    e isa InvalidStateException && e.state === :closed && break
                    # Otherwise throw
                    rethrow(e)
                end
            end
        end
    end
    return workqueue, workdoneperthread
end

In addition to workqueue, spawnthreads returns workdoneperthread, which tracks how many times each thread happened to be running a worker task after a chunk was updated.

The actual update function is pretty simple–it puts work onto the queue and waits (using yield to hopefully be productive in the meantime) until all chunks have been updated:

function update!(tv::ThreadVec, workqueue::Channel{Tuple{ThreadVec,Int64,Int64}})
    dim = length(tv)
    chunksize::Int = ceil(dim / Threads.nthreads() / log(dim))
    for i=1:chunksize:dim
        @atomic tv.remainingwork += 1
        put!(workqueue, (tv, i, min(i+chunksize, dim)))
    end
    while tv.remainingwork > 0
        yield()
    end
end

The main I’ve been testing with spawns worker threads, creates a ThreadVec, and updates it 10 times:

function main()
    # Create a vector for updating
    tv = ThreadVec(Int(1e8))

    # Spawn workers
    workqueue, workdoneperthread = spawnworkers()

    # Update 10 times
    t1 = time()
    for _ in 1:10
        update!(tv, workqueue)
    end
    t2 = time()

    # Clean up
    close(workqueue) # should trigger threads to finish

    # Print results
    totalwork = sum(x->x[], workdoneperthread)
    println("Time elapsed: ", t2 - t1)
    println("Share of actual computation done by each thread:")
    for (tid, work) in enumerate(workdoneperthread)
        println("    Thread $tid: $(100 * work[] / totalwork |> round |> Int)%")
    end
end

The results of running main() look like:

Time elapsed: 0.7464809417724609
Share of actual computation done by each thread:
    Thread 1: 6%
    Thread 2: 29%
    Thread 3: 30%
    Thread 4: 35%

The Problem

Thread 1 rarely does much work. As you might expect, this is particularly problematic with 2 threads. The amount of time on average (10 runs) it takes to update a 100 million-element ThreadVec 10 times with 1, 2, and 4 threads, along with the average amount of work done by thread 1, is shown in this table:

Threads Time (s) Work by thread 1
1 1.84 100%
2 2.43 5.3%
4 0.76 5.4%

This agrees with @profview, where I consistently see yield() taking around 40% of the program time when run with 2 threads. I don’t think this is unique to this particular problem–changing the size of tv, changing chunksize, and changing the toy algorithm itself have no discernible effect, and the results are similar to those of the non-toy algorithm which is more computationally intensive. I also don’t think it’s a fluke unique to this computer or this run–on a laptop, a desktop, and a beefy server, and on different days given the same computer, the results have been consistent.

How should I fix this performance issue? The easiest way would be to replace the yield in the waiting while loop in update! with a non-blocking take!, but unfortunately that doesn’t (yet?) exist. I’m open to other ideas for organization, but keep in mind this is just a toy example–I’m looking for some form of producer-consumer, not Threads.@threads for ... or similar.

maybe this helps: Concurrency patterns for controlled parallelisms thanks to the wonderful @tkf

1 Like

That is an awesome resource, thank you for sharing!

One of the reasons I didn’t use the simple (“worker pool”) approach of spawning a task for each “chunk” (which seems to fit this sort of problem well as far as ease of programming):

@sync for i=1:chunksize:last
    Threads.@spawn dowork!($i, chunksize)
end

…was that I thought that each @spawn has some overhead, and that since the pieces of work can be relatively small and number of time steps large, spawning once and having each thread do multiple units of work would be faster, especially for smaller problem sizes. Maybe this isn’t an issue, though, or at least doesn’t cause more overhead than using a Channel as an intermediary–I’ll have to optimize both versions and see if there’s any difference.

If I understand correctly, the original code uses the mentioned task farm approach; I’m just not sure what best to do with the main thread while waiting (aside from using a non-blocking take! and doing work manually), since I can’t return from update! until all the chunks have finished updating.