I have a function that does a @threads loop. There is a condition that gets evaluated inside the loop. If the condition returns false, there’s no point in continuing any of the work; I’d like to just return false:
function foo(x, y, z)
nt = nthreads()
@threads for i in 1:nt
result = do_conditional(x, y, z)
if !result # I want to just return false from the function right away, terminating all the other threads
...
end # @threads
end # function
If anyone has thought about an API for this, it’s probably @tkf in the context of Transducers.jl. I know he’s implemented early termination for foldl but I’m not sure about multi-threaded functions.
Just to be clear, it’s not possible to terminate @threads for early, at least for now. You need to build @threads-like iteration support on top of @spawn for this.
If you are curious, I think the simplest example code is Threads.foreach:
But, if you just want to use something like a for loop without implementing it, you can also just use Transducers.jl. If you have a sequential version
y = nothing
for x in xs
z = process(x)
if should_terminate(z)
y = z
break
end
end
then the parallel version is
reduce(Map(identity), xs; init = nothing) do _, x
y = process(x)
if should_terminate(y)
return Reduce(y) # equivalent to `break`
else
return nothing
end
end
using Base.Threads: Atomic, @threads
function want_to_return_early_PLEASE_DONT_USE_THIS(n, m)
stop = Atomic{Bool}(false)
result = Atomic{Bool}(false)
@threads for i in 1:n
stop[] && return
if n == m
result[] = true
stop[] = true
return
end
end
return result[]
end
However, I don’t recommend this because:
It relies on an undefined behavior of @threads; i.e., return terminates the basecase iteration.
It introduces an atomic read stop[]. It may suppress some compiler optimization (even in x86). But I’m not super sure about this.
A better strategy is to chunk the iteration and check stop[] for each chunk (rather than each iteration).
What do you mean by “doable with a macro?” If you mean that you can use a macro to support a syntax sguar for break-able threaded loop, yes, I agree that’s possible. But you need some implementation to which the macro desguars the expression.
(In fact, https://github.com/JuliaFolds/FLoops.jl uses the same technology as ReduceIf in the snippet above to implement break/return/@goto. So, in a way FLoops.jl shows that it’s indeed possible. I’ll probably add threaded/distributed loop support to FLoops.jl.)
There is a way to return from “a function” from within a thread
I call this Treasure Hunting…
You have say 4 workers which is looking for the Treasure and you want the work to stop as soon as the Treasure is found.
Here is my example program
using Base.Threads
struct WorkDetail
t::Int64
data::Vector{Float64}
end
@show rawdata = rand(10)
NumOfThreads = 4
chunksize = length(rawdata) ÷ NumOfThreads
WorkDetailArr = WorkDetail[]
for t = 1:NumOfThreads
if t < NumOfThreads
push!( WorkDetailArr, WorkDetail(t, rawdata[chunksize*(t-1)+1:chunksize*(t)+1-1]) )
else
push!( WorkDetailArr, WorkDetail(t, rawdata[chunksize*(t-1)+1:length(rawdata)]) )
end
end
@show WorkDetailArr
Completed = Bool[ false for _ = 1:NumOfThreads ]
Accepted = Bool[ false for _ = 1:NumOfThreads ]
Success = Bool[ false for _ = 1:NumOfThreads ]
WorkTask(t) = begin
result = nothing
for c = 1:length(WorkDetailArr[t].data)
println("Thread $(t), Working with data = $(WorkDetailArr[t].data[c])")
sleep((NumOfThreads - t + 1) * rand())
FoundTreasureBool = rand() < 0.20
if FoundTreasureBool
Success[t] = true
result = WorkDetailArr[t].data[c]
break
end
end
"Signal that this thread has completed"
Completed[t] = true
println("Thread $(t) completion has been signaled")
return result
end
Workers = Task[]
for t = 1:NumOfThreads
push!(Workers, Threads.@spawn WorkTask(t))
println("Thread $(t) has been spawned")
end
# while Accepted contains at least one element which is false
treasurefound = false
while ! all(Accepted) && ! treasurefound
for t = 1:length(Workers)
if xor(Completed[t],Accepted[t])
println("Thread $(t) completion has been accepted")
"Mark that controller has accepted the thread completion"
Accepted[t] = true
"Check if the worker is successful in finding the treasure"
if Success[t]
treasure = Threads.fetch(Workers[t])
println("Thread $(t) has found treasure! Its value is ",treasure)
global treasurefound = true
break
else
Threads.wait(Workers[t])
end
end
end
sleep(0.05) # Sleep for 50 milisecond
end
println("Done")
exit()
You need to use some kind of locking mechanism (lock/atomic) to mutate the global states like this. This program has data races and so its behavior is undefined.
(One may argue that released versions of julia do not define the memory model so that what we can’t talk about memory ordering anyway. But Julia devs are working on it.)
Besides that the behavior of program is undefined, it’s better to avoid polling like this.
ReduceIf etc. in Transducers.jl hides all these difficulties and provide easy-to-use interface. In general, I think it’s better to avoid @spawn if you can find higher-level abstractions elsewhere.