`break` with a threaded loop?

Is there any simple way to break from a threaded for loop (actually I want an infinite while loop, but that doesn’t work with @threads)? Or would I need to use something like channels for synchronization?

Trivial example:

julia> r = []
       r_lock = ReentrantLock()
       Threads.@threads for _ in 1:10^9
         sleep(0.01)
         lock(r_lock) do
           push!(r, rand())
           length(r) == 100 && break
         end
       end
ERROR: syntax: break or continue outside loop
Stacktrace:
 [1] top-level scope
   @ REPL[85]:3

I think the best you could do, is introduce a shared flag (could be another lock) which flips when one of the threads hits your termination condition.

The other threads could look at that flag (for example with trylock()) and terminate when it is flipped.

This approach would probably require a Channel. Threads.@threads is really only meant for for loops with a known length.

2 Likes

Yeah, Threads.@threads is a bad fit for this. I’d instead use Transducers.jl. It won’t guarentee that it breaks at exactly the loop iteration you ask for (in general, I think such a thing is kinda incompatible with the notion of parallelism, but here’ what your example would look like (though I’ve stored the threadid() in r instead of a random number so you can see which threads did what)

julia> using Transducers, BangBang, MicroCollections

julia> foldxt(1:10^9, init=Int[]) do r, _
           sleep(0.01)
           r = append!!(r, SingletonVector(Threads.threadid()))
           if length(r) >= 100
               reduced(r)
           else
               r
           end
       end
103-element Vector{Int64}:
 2
 4
 1
 3
 2
 4
 1
 2
 3
 4
 4
 3
 2
 1
 4
 2
 1
 3
 4
 2
 3
 1
 2
 ⋮
 1
 2
 3
 1
 4
 2
 4
 3
 1
 1
 4
 3
 4
 3
 1
 3
 1
 2
 4
 2
 3
 1

Edit: the above code has correctness problems. @tkf has posted a fixed version here: `break` with a threaded loop? - #10 by tkf

1 Like

What Mason wrote is the way to go with vanilla Transducers.jl. You can also use break, return, or @goto (with a label outside of the loop) inside of @floop to express this as well.

Actually, that’s not the case. break can be expressed using a so-called find-first (or find-last) monoid. You can search it in Monoid - Wikipedia. It can be computed in parallel and also the result is deterministic (unless the user opt-out this explicitly). See also: Generic `Reduced` handling in parallel reduce by tkf · Pull Request #172 · JuliaFolds/Transducers.jl · GitHub

3 Likes

For what it’s worth, I had some trouble getting this to work personally. It seemed to just ignore the Break condition when I was trying it and I didn’t have time to track down what was going on. I can look again in the morning and try to send you an MWE

Yeah, let me know if there are some peculiar behavior

FWIW, here’s a demo of using break in parallel @floop

julia> function demo(xs = rand(2^20), ex = ThreadedEx(basesize = 2^10))
           xmax = Threads.Atomic{Float64}(0.0)
           imax = Threads.Atomic{Int}(0)
           @floop ex for (i, x) in pairs(xs)
               if x > 0.9999
                   xmax[] = x
                   imax[] = i
                   @reduce() do (j = nothing; i), (y = nothing; x)
                       j = i
                       y = x
                   end
                   break
               end
           end
           (j => y), (imax[] => xmax[])
       end;

julia> for _ in 1:100
           a, b = demo()
           if a != b
               @show a b
               break
           end
       end
a = 11792 => 0.99993600686011
b = 14586 => 0.9999006513025757

Here’s a MWE

julia> using FLoops, MicroCollections, BangBang

julia> @floop for i ∈ 1:100
           sleep(0.001)
           @reduce(r = append!!(EmptyVector(), SingletonVector(Threads.threadid())))
           length(r) >= 10 && break
       end; 

julia> r
10-element Vector{Int64}:
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1

So that didn’t multithread the loop, so explicitly providing a threaded executor I get

julia> @floop ThreadedEx(basesize=1) for i ∈ 1:100
           sleep(0.001)
           @reduce(r = append!!(EmptyVector(), SingletonVector(Threads.threadid())))
           length(r) >= 10 && break
           r
       end

julia> r
100-element Vector{Int64}:
 1
 2
 1
 5
 6
 2
 5
 1
 3
 1
 1
 1
 4
 2
 5
 4
 1
 1
 4
 3
 2
 2
 2
 2
 6
 3
 1
 2
 3
 3
 1
 1
 1
 1
 1
 4
 ⋮
 4
 1
 1
 6
 4
 1
 2
 1
 2
 4
 1
 1
 5
 2
 5
 5
 5
 3
 4
 2
 5
 2
 3
 1
 1
 2
 4
 3
 1
 5
 4
 1
 2
 6
 2

So it never actually breaks out of the loop

So @floop should actually be throwing an error since the accumulator r is accessed in the loop body outside of @reduce. (One way to understand parallel reduction is that the accumulator is “write only.”) But, apparently, it silently generates nonsensical code.

Using “break” based on @reduced result is rather tricky since you have to show idempotency.

But, if you accept non-determinism, perhaps the simplest but still “lock-less” approach is something like this

julia> xs = rand(2^30)
       basesize = 2^15
       chunks = Iterators.partition(xs, basesize)
       outputs = [Int[] for _ in chunks]
       counter = Threads.Atomic{Int}(0)
       @floop ThreadedEx(basesize = 1) for (output, chunk) in zip(outputs, chunks)
           for x in chunk
               counter[] > 10 && break
               @reduce nitems += 1
               if x > 0.9999
                   Threads.atomic_add!(counter, 1)
                   push!(output, Threads.threadid())
               end
           end
       end

julia> nitems, length(xs)  # processed items vs input items
(199088, 1073741824)

julia> foldl(append!, outputs; init = Int[])
11-element Vector{Int64}:
 1
 1
 3
 3
 2
 2
 2
 2
 4
 1
 3

Ah I see. This is why I usually end up going back to Transducers.jl. Was the foldxt I wrote above kosher?

There are two problems in this code.

First, init = Int[] means that this vector is shared across multiple tasks (and threads if Threads.nthreads() > 1). It means that the same vector is mutated by multiple tasks that can run on different threads. So, this code is not thread-safe (in particular can have data races). You need to pass OnInit(() -> Int[]) or EmptyVector{Int}() etc.

Second, the binary function passed to foldxt is not “good” in the sense that it probably is not doing what you want it to do. The binary function passed to foldxt have to combine two intermediate results. But, your function ignores the second argument. It means that it only process the first basecase. It can be observed by something like

julia> foldxt(1:100, init=EmptyVector{Int}(), basesize=10) do r, _
           # sleep(0.01)
           r = append!!(r, SingletonVector(Threads.threadid()))
           if length(r) >= 100
               reduced(r)
           else
               r
           end
       end
10-element Vector{Int64}:
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1

You can see that the output has 10 elements even if the input has 100 elements.

Note that I use init=EmptyVector{Int}(). If you use init=Int[], it may look like doing what you want it to do. However, as I said, it was not a valid Julia program.

Perhaps what you wanted to write was something like this:

1:1000 |>
Map() do _
    sleep(0.01)
    SingletonVector(Threads.threadid())
end |>
foldxt(init = OnInit(() -> Int[]), basesize = 1) do a, b
    r = append!!(a, b)
    if length(r) >= 100
        reduced(r)
    else
        r
    end
end
3 Likes

Ah, thank you that is instructive. Multithreading is hard to do right!

Well, thanks for asking the question! If you didn’t get it, I’m almost sure almost no one (except me) understand how to use it :slight_smile: Now that I think of it, I don’t think I’ve ever explained the short-circuiting reduction clearly in the documentation. I better write a tutorial somewhere.

(This protocol was also hard to design right. I know I screwed up the current design. Fixing it for some edge cases has been my TODO for a long time…)

3 Likes