Is there any simple way to break from a threaded for loop (actually I want an infinite while loop, but that doesn’t work with @threads)? Or would I need to use something like channels for synchronization?
Trivial example:
julia> r = []
r_lock = ReentrantLock()
Threads.@threads for _ in 1:10^9
sleep(0.01)
lock(r_lock) do
push!(r, rand())
length(r) == 100 && break
end
end
ERROR: syntax: break or continue outside loop
Stacktrace:
[1] top-level scope
@ REPL[85]:3
Yeah, Threads.@threads is a bad fit for this. I’d instead use Transducers.jl. It won’t guarentee that it breaks at exactly the loop iteration you ask for (in general, I think such a thing is kinda incompatible with the notion of parallelism, but here’ what your example would look like (though I’ve stored the threadid() in r instead of a random number so you can see which threads did what)
What Mason wrote is the way to go with vanilla Transducers.jl. You can also use break, return, or @goto (with a label outside of the loop) inside of @floop to express this as well.
For what it’s worth, I had some trouble getting this to work personally. It seemed to just ignore the Break condition when I was trying it and I didn’t have time to track down what was going on. I can look again in the morning and try to send you an MWE
Yeah, let me know if there are some peculiar behavior
FWIW, here’s a demo of using break in parallel @floop
julia> function demo(xs = rand(2^20), ex = ThreadedEx(basesize = 2^10))
xmax = Threads.Atomic{Float64}(0.0)
imax = Threads.Atomic{Int}(0)
@floop ex for (i, x) in pairs(xs)
if x > 0.9999
xmax[] = x
imax[] = i
@reduce() do (j = nothing; i), (y = nothing; x)
j = i
y = x
end
break
end
end
(j => y), (imax[] => xmax[])
end;
julia> for _ in 1:100
a, b = demo()
if a != b
@show a b
break
end
end
a = 11792 => 0.99993600686011
b = 14586 => 0.9999006513025757
So @floop should actually be throwing an error since the accumulator r is accessed in the loop body outside of @reduce. (One way to understand parallel reduction is that the accumulator is “write only.”) But, apparently, it silently generates nonsensical code.
Using “break” based on @reduced result is rather tricky since you have to show idempotency.
But, if you accept non-determinism, perhaps the simplest but still “lock-less” approach is something like this
julia> xs = rand(2^30)
basesize = 2^15
chunks = Iterators.partition(xs, basesize)
outputs = [Int[] for _ in chunks]
counter = Threads.Atomic{Int}(0)
@floop ThreadedEx(basesize = 1) for (output, chunk) in zip(outputs, chunks)
for x in chunk
counter[] > 10 && break
@reduce nitems += 1
if x > 0.9999
Threads.atomic_add!(counter, 1)
push!(output, Threads.threadid())
end
end
end
julia> nitems, length(xs) # processed items vs input items
(199088, 1073741824)
julia> foldl(append!, outputs; init = Int[])
11-element Vector{Int64}:
1
1
3
3
2
2
2
2
4
1
3
First, init = Int[] means that this vector is shared across multiple tasks (and threads if Threads.nthreads() > 1). It means that the same vector is mutated by multiple tasks that can run on different threads. So, this code is not thread-safe (in particular can have data races). You need to pass OnInit(() -> Int[]) or EmptyVector{Int}() etc.
Second, the binary function passed to foldxt is not “good” in the sense that it probably is not doing what you want it to do. The binary function passed to foldxt have to combine two intermediate results. But, your function ignores the second argument. It means that it only process the first basecase. It can be observed by something like
julia> foldxt(1:100, init=EmptyVector{Int}(), basesize=10) do r, _
# sleep(0.01)
r = append!!(r, SingletonVector(Threads.threadid()))
if length(r) >= 100
reduced(r)
else
r
end
end
10-element Vector{Int64}:
1
1
1
1
1
1
1
1
1
1
You can see that the output has 10 elements even if the input has 100 elements.
Note that I use init=EmptyVector{Int}(). If you use init=Int[], it may look like doing what you want it to do. However, as I said, it was not a valid Julia program.
Perhaps what you wanted to write was something like this:
1:1000 |>
Map() do _
sleep(0.01)
SingletonVector(Threads.threadid())
end |>
foldxt(init = OnInit(() -> Int[]), basesize = 1) do a, b
r = append!!(a, b)
if length(r) >= 100
reduced(r)
else
r
end
end
Well, thanks for asking the question! If you didn’t get it, I’m almost sure almost no one (except me) understand how to use it Now that I think of it, I don’t think I’ve ever explained the short-circuiting reduction clearly in the documentation. I better write a tutorial somewhere.
(This protocol was also hard to design right. I know I screwed up the current design. Fixing it for some edge cases has been my TODO for a long time…)