Understanding "random" serialization when using @spawn in a "parallel region"

I have some Julia (1.10.10) code that I am benchmarking on a HPC cluster node with 76 physical cores, running with 76 threads, and I am seeing HUGE variability between runs with everything equal (same hardware, same code, same libraries, everything).

This is the performance per thread, averaged on a 30sec period, of a “good” run:

And here is the same, for a “bad” run (which lasts ~5x longer, doing the same things):

Where there is mostly only one thread working.
Notice that at the beginning and at the end for some reason all the threads do some work.

The code uses multithreading, with many Threads.@threads invocations, but there is a loop where the cost of every iteration has a different computational cost, and there, to tackle load balancing issues, it was decided to use dynamic scheduling by spawning one task per iteration (in total, 2500 tasks) in a @sync block.

Since that is the only place where things are not reproducible, I was led to conclude that the use of @spawn might be the culprit.

The sync/@spawn block, sligtly amended for clarity, reads:


# Creating a channel with a the same number of elements as there are threads:
BufferChannel1 = Channel{T}(Threads.nthreads())
for buff in [BufferType() for _ in 1:Threads.nthreads()]
    put!(BufferChannel1, buff)
end

# same for BufferChannel2
[...]

@sync begin
    for is in 1:50, it in 1:50
        Threads.@spawn begin
        Buffer1 = take!(BufferChannel1)
        Buffer2 = take!(BufferChannel2)

        for w = -lenIntw:lenIntw-1 # Matsubara sum
            f!(Buffer1, w, w + is)
            for iu in iurange
                g!(X, State, Par, is, it, iu, w, Buffer1)
                if (iu <= it)
                    h!(X, State, Par, is, it, iu, w, Buffer1, Buffer2)
                end
            end
        end
        put!(BuffersChannel1, Buffer1)
        put!(BuffersChannel2, Buffer2)
    end
end

So, my question to you are:

  • Is it possible that this code causes the two activity patterns seen in the figures?
  • If so, how? I would naively assume that a thread would start only one task at a time, but I guess that if the current task yields (perhaps at take! or put!?) then the thread could start another task which might yield at the same point, and so on? Could it be that the locks for the channels and the locks for the task interfere somehow causing serialization?

You’re right that more than nthreads() tasks may be started. With this many tasks it’s even quite likely that some of the take!s will end up in a wait(), and some other task is started.

Perhaps it will help to do the put!s in the reverse order of the take!s. In this way there will be a Buffer2 available as soon as the take! of a Buffer1 has succeeded. This is the way to go with multiple locks to avoid deadlocks/mess, and even if no lock is held here, you may experience a similar effect. I’m not sure.

Another thing to try is to use only one channel, fill it with Tuple{Buffertype1, Buffertype2}, with just a single take! and put! in the loop. There will be less complicated lock contention.

There is also a possibility to use a Base.Semaphore() to control the number of tasks actually running. I.e. something like:

sem = Base.Semaphore(nthreads())
...
@spawn begin
    Base.acquire(sem) do
        take!(...)
        work
        put!(...)
    end
end
1 Like

Check whether anything in f!, g!, h! uses Threads.@threads :static.

That should in theory cause a nested task error: @threads :static cannot be used concurrently or nested. However, even a very short glance reveals a toctou race condition on the check, so :person_shrugging:

Then you should check whether something otherwise creates any sticky tasks. Sticky tasks are the devil. Same for locks.

You can expect that you should have nthreads()^2 many tasks that are ready to work in the scheduler queue at most times: nthreads() many from the outer sync/spawn loop, and each one the same number from each inner/nested nthreads(); and at least nthreads(), so no core should ever go idle.

Thanks! But I have a question about this:

sem = Base.Semaphore(nthreads())
channel = ... # channel with nthreads() things inside
...
@spawn begin
    Base.acquire(sem) do
        thing = take!(channel)
        work(thing)
        put!(channel,thing)
    end
end

An LLM suggested something similar to me, but I wasn’t sure it was allucinating, since I thought that Base.acquire(sem) does something isomorphic to take! and then put!.

But I just realized that while take! could make the task yield and the thread running the current task switch to another task, acquire would block the thread so that it would not be able to run another task, right? In this way a single thread would not be able to start all the tasks making all the other threads starve for work.

Is my interpretation correct?

ehm, I might be slow, but where is the race condition in my code? on which check?

It is not the case, but thank you for the suggestion.

That sounds like a lot of tasks to me. Have you considered using @threads :greedy here, which is exactly designed with this usecase in mind? It should spawn much fewer tasks.

The race condition is in Threads.@threads, i.e. Base. So not your fault, and if you’re not (transitively) using Threads.@threads :static then it’s also not your problem; and worst-case effect of the race is turning a clear error into a hard-to-debug slowdown or dead-lock.

Look at @macroexpand Threads.@threads :static for i=1:2 1+1 end. You will see a check on ccall(:jl_in_threaded_region, ...). Afterwards the code calls into Base.Threads.threading_run(var"#2#threadsfor_fun", true), which actually increments the corresponding counter in ccall(:jl_enter_threaded_region, Cvoid, ()). Hence, multiple threading_runs can race into the protected region at the same time.

Not really? Spawning 2500 tasks that do nothing and waiting for them takes about 1-10 ms for me:

julia> function foo(N)
       @sync for i=1:N
       Threads.@spawn 1+1
       end
       nothing
       end;

julia> @btime foo(2500)
  1.686 ms (10017 allocations: 835.41 KiB)

:greedy creates the same number of tasks. Also, the :greedy schedule doesn’t exist yet on julia 1.10 which OP is targeting.

I don’t think it does :thinking: Last I checked, it was up threadpoolsize(), not up to the number of iterations. It’d be bad if it did, since it supports infinite iterators :slight_smile:

Ah, I missed that! Yeah, then rolling it yourself is probably the best you can end up with..

1 Like

Yes, you’re right and I was wrong. I mistakenly thought it spawned a task per item, but limited the number of items that can be in-flight at once, semaphore-style. Instead, it uses a Channel with no buffer, a single producer and multiple consumer tasks.

I would have chosen a buffered channel in order to reduce the amount of task-switching required:

julia> function bar(N)
       Threads.@threads :greedy for i=1:N
       1+1
       end
       nothing
       end;

julia> @btime foo(10_000)
  6.540 ms (40022 allocations: 3.24 MiB)

julia> @btime bar(10_000)
  41.667 ms (19603 allocations: 312.88 KiB)

julia> @eval Base.Threads function greedy_func(itr, lidx, lbody)
           quote
               let c = Channel{eltype($itr)}(64,spawn=true) do ch
                   for item in $itr
                       put!(ch, item)
                   end
               end
               function threadsfor_fun(tid)
                   for item in c
                       local $(esc(lidx)) = item
                       $(esc(lbody))
                   end
               end
               end
           end
       end
greedy_func (generic function with 1 method)

julia> function bar(N)
       Threads.@threads :greedy for i=1:N
       1+1
       end
       nothing
       end;

julia> @btime bar(10_000)
  2.550 ms (9608 allocations: 158.19 KiB)

(I chose 64 as the buffer size, because after that my machine got diminishing returns, because the bottleneck becomes the channel lock instead of the task-switching latency)

But nothing of that solves OP’s mystery :frowning:

My guess is still either a sticky thread or some lock, hidden deep inside the code we’re not seeing (which we’re told is using @threads as well).