Barrier to synchronize threads

Hello all,
AFAIK there is now built-in way to synchronize Julia threads at a barrier (think OpenMP Barrier).
The expected functionality is as follows:

function test_barrier(nt, ns)
    @sync begin
        b = Barrier(nt)
        for rank in 1:nt
            @spawn for i in 1:ns
                println("Task $rank at step $i")
                wait_at_barrier(rank, b)
            end
        end
    end
end

test_barrier(40,4)

This is supposed to spawn 40 tasks performing a work in 4 stages. Each task waits until others have completed a given stage before moving to the next stage.

After some back-and forth with an AI I arrived at:

using Base.Threads

struct Barrier
    count::Int
    arrived::Threads.Atomic{Int}
    release::Channel{Nothing}
end

Barrier(n::Int) = Barrier(n, Atomic{Int}(0), Channel{Nothing}(n))

function wait_at_barrier(rank, b::Barrier)
    (; release, arrived, count) = b
    if atomic_add!(arrived, 1) == count - 1
        arrived[] = 0 # before releasing other threads
        for _ in 1:count
            put!(release, nothing)
        end
    end
    return take!(release)
end

Unfortunately this hangs…
Anybody knows of a better way ?

Thanks

I think OhMyThreads.jl has a barrier implementation: Internal · OhMyThreads.jl

Base also has Semaphores Tasks · The Julia Language

SimpleBarrier does the job, thanks !
Any chance this becomes part of the official API ?

Here is a solution using Channels

struct Barrier
    count::Int
    chan::Channel{Channel{Nothing}}
end

function Barrier(n::Int)
    ch = Channel{Channel{Nothing}}()
    @spawn begin 
        while true
            acks = [take!(ch) for i in 1:n]
            for bc in acks
                put!(bc, nothing)
            end
        end
    end
    Barrier(n, ch)
end

function wait_on_barrier(b::Barrier)
    bc = Channel{Nothing}()
    put!(b.chan, bc)
    take!(bc)
end

Imho, channels are a very nice and underappreciated synchronization primitive.

1 Like