Is there a way to return from a function from within a thread?

I have a function that does a @threads loop. There is a condition that gets evaluated inside the loop. If the condition returns false, there’s no point in continuing any of the work; I’d like to just return false:

function foo(x, y, z)
    nt = nthreads()
    @threads for i in 1:nt 
        result = do_conditional(x, y, z)
        if !result # I want to just return false from the function right away, terminating all the other threads
    end # @threads
end # function

If anyone has thought about an API for this, it’s probably @tkf in the context of Transducers.jl. I know he’s implemented early termination for foldl but I’m not sure about multi-threaded functions.

1 Like

This requires thread synchronization in any case and you can do that yourself with an Atomic .

Yeah, I used Atomic to do that (in reduce, not foldl, actually).

I am quite new to Atomic so I guess I am missing the point here. For example, how would I make this contrived example work with atomics?

function want_to_return_early(n, m)
  @threads for  i in 1:n
      if n == m
         return true
   return false

Just to be clear, it’s not possible to terminate @threads for early, at least for now. You need to build @threads-like iteration support on top of @spawn for this.

If you are curious, I think the simplest example code is Threads.foreach:

But, if you just want to use something like a for loop without implementing it, you can also just use Transducers.jl. If you have a sequential version

y = nothing
for x in xs
    z = process(x)
    if should_terminate(z)
        y = z

then the parallel version is

reduce(Map(identity), xs; init = nothing) do _, x
    y = process(x)
    if should_terminate(y)
        return Reduce(y)  # equivalent to `break`
        return nothing

or equivalently

reduce(right, Map(process) |> ReduceIf(should_terminate), xs; init = nothing)

which is actually incorrect:

using Base.Threads: Atomic, @threads

function want_to_return_early_PLEASE_DONT_USE_THIS(n, m)
    stop = Atomic{Bool}(false)
    result = Atomic{Bool}(false)
    @threads for i in 1:n
        stop[] && return
        if n == m
            result[] = true
            stop[] = true
    return result[]

However, I don’t recommend this because:

  1. It relies on an undefined behavior of @threads; i.e., return terminates the basecase iteration.
  2. It introduces an atomic read stop[]. It may suppress some compiler optimization (even in x86). But I’m not super sure about this.

A better strategy is to chunk the iteration and check stop[] for each chunk (rather than each iteration).


I don’t know enough about macros, but I am thinking would the thing I am trying to achieve be doable with a macro.

What do you mean by “doable with a macro?” If you mean that you can use a macro to support a syntax sguar for break-able threaded loop, yes, I agree that’s possible. But you need some implementation to which the macro desguars the expression.

(In fact, uses the same technology as ReduceIf in the snippet above to implement break/return/@goto. So, in a way FLoops.jl shows that it’s indeed possible. I’ll probably add threaded/distributed loop support to FLoops.jl.)

1 Like

There is a way to return from “a function” from within a thread

I call this Treasure Hunting…
You have say 4 workers which is looking for the Treasure and you want the work to stop as soon as the Treasure is found.

Here is my example program

using Base.Threads
struct WorkDetail

@show rawdata = rand(10)
NumOfThreads = 4
chunksize = length(rawdata) ÷ NumOfThreads
WorkDetailArr = WorkDetail[]
for t = 1:NumOfThreads
    if t < NumOfThreads
        push!( WorkDetailArr, WorkDetail(t, rawdata[chunksize*(t-1)+1:chunksize*(t)+1-1]) )
        push!( WorkDetailArr, WorkDetail(t, rawdata[chunksize*(t-1)+1:length(rawdata)]) )
@show WorkDetailArr

Completed = Bool[ false for _ = 1:NumOfThreads ]
Accepted  = Bool[ false for _ = 1:NumOfThreads ]
Success   = Bool[ false for _ = 1:NumOfThreads ]

WorkTask(t) = begin
    result = nothing
    for c = 1:length(WorkDetailArr[t].data)
        println("Thread $(t), Working with data = $(WorkDetailArr[t].data[c])")
        sleep((NumOfThreads - t + 1) * rand())
        FoundTreasureBool = rand() < 0.20
        if FoundTreasureBool
            Success[t] = true
            result = WorkDetailArr[t].data[c]
    "Signal that this thread has completed"
    Completed[t] = true
    println("Thread $(t) completion has been signaled")
    return result

Workers = Task[]
for t = 1:NumOfThreads
    push!(Workers, Threads.@spawn WorkTask(t))
    println("Thread $(t) has been spawned")

# while Accepted contains at least one element which is false
treasurefound = false
while ! all(Accepted) && ! treasurefound
    for t = 1:length(Workers)
        if xor(Completed[t],Accepted[t])
            println("Thread $(t) completion has been accepted")
            "Mark that controller has accepted the thread completion"
            Accepted[t] = true
            "Check if the worker is successful in finding the treasure"
            if Success[t]
                treasure = Threads.fetch(Workers[t])
                println("Thread $(t) has found treasure! Its value is ",treasure)
                global treasurefound = true
    sleep(0.05) # Sleep for 50 milisecond

You need to use some kind of locking mechanism (lock/atomic) to mutate the global states like this. This program has data races and so its behavior is undefined.

(One may argue that released versions of julia do not define the memory model so that what we can’t talk about memory ordering anyway. But Julia devs are working on it.)

Besides that the behavior of program is undefined, it’s better to avoid polling like this.

ReduceIf etc. in Transducers.jl hides all these difficulties and provide easy-to-use interface. In general, I think it’s better to avoid @spawn if you can find higher-level abstractions elsewhere.

1 Like