Is there a way to return from a function from within a thread?

I have a function that does a @threads loop. There is a condition that gets evaluated inside the loop. If the condition returns false, there’s no point in continuing any of the work; I’d like to just return false:

function foo(x, y, z)
    nt = nthreads()
    @threads for i in 1:nt 
        result = do_conditional(x, y, z)
        if !result # I want to just return false from the function right away, terminating all the other threads
    ...
    end # @threads
end # function
1 Like

If anyone has thought about an API for this, it’s probably @tkf in the context of Transducers.jl. I know he’s implemented early termination for foldl but I’m not sure about multi-threaded functions.

1 Like

This requires thread synchronization in any case and you can do that yourself with an Atomic .

Yeah, I used Atomic to do that (in reduce, not foldl, actually).

I am quite new to Atomic so I guess I am missing the point here. For example, how would I make this contrived example work with atomics?

function want_to_return_early(n, m)
  @threads for  i in 1:n
      if n == m
         return true
      end
   end
   return false
end

Just to be clear, it’s not possible to terminate @threads for early, at least for now. You need to build @threads-like iteration support on top of @spawn for this.

If you are curious, I think the simplest example code is Threads.foreach:
https://github.com/JuliaLang/julia/pull/34543/files#diff-c3a80cb045a6bb5c1884c379f4fab047


But, if you just want to use something like a for loop without implementing it, you can also just use Transducers.jl. If you have a sequential version

y = nothing
for x in xs
    z = process(x)
    if should_terminate(z)
        y = z
        break
    end
end

then the parallel version is

reduce(Map(identity), xs; init = nothing) do _, x
    y = process(x)
    if should_terminate(y)
        return Reduce(y)  # equivalent to `break`
    else
        return nothing
    end
end

or equivalently

reduce(right, Map(process) |> ReduceIf(should_terminate), xs; init = nothing)
2 Likes

which is actually incorrect:

using Base.Threads: Atomic, @threads

function want_to_return_early_PLEASE_DONT_USE_THIS(n, m)
    stop = Atomic{Bool}(false)
    result = Atomic{Bool}(false)
    @threads for i in 1:n
        stop[] && return
        if n == m
            result[] = true
            stop[] = true
            return
        end
    end
    return result[]
end

However, I don’t recommend this because:

  1. It relies on an undefined behavior of @threads; i.e., return terminates the basecase iteration.
  2. It introduces an atomic read stop[]. It may suppress some compiler optimization (even in x86). But I’m not super sure about this.

A better strategy is to chunk the iteration and check stop[] for each chunk (rather than each iteration).

2 Likes

I don’t know enough about macros, but I am thinking would the thing I am trying to achieve be doable with a macro.

What do you mean by “doable with a macro?” If you mean that you can use a macro to support a syntax sguar for break-able threaded loop, yes, I agree that’s possible. But you need some implementation to which the macro desguars the expression.

(In fact, GitHub - JuliaFolds/FLoops.jl: Fast sequential, threaded, and distributed for-loops for Julia—fold for humans™ uses the same technology as ReduceIf in the snippet above to implement break/return/@goto. So, in a way FLoops.jl shows that it’s indeed possible. I’ll probably add threaded/distributed loop support to FLoops.jl.)

1 Like

There is a way to return from “a function” from within a thread

I call this Treasure Hunting…
You have say 4 workers which is looking for the Treasure and you want the work to stop as soon as the Treasure is found.

Here is my example program

using Base.Threads
struct WorkDetail
    t::Int64
    data::Vector{Float64}
end

@show rawdata = rand(10)
NumOfThreads = 4
chunksize = length(rawdata) ÷ NumOfThreads
WorkDetailArr = WorkDetail[]
for t = 1:NumOfThreads
    if t < NumOfThreads
        push!( WorkDetailArr, WorkDetail(t, rawdata[chunksize*(t-1)+1:chunksize*(t)+1-1]) )
    else
        push!( WorkDetailArr, WorkDetail(t, rawdata[chunksize*(t-1)+1:length(rawdata)]) )
    end
end
@show WorkDetailArr

Completed = Bool[ false for _ = 1:NumOfThreads ]
Accepted  = Bool[ false for _ = 1:NumOfThreads ]
Success   = Bool[ false for _ = 1:NumOfThreads ]

WorkTask(t) = begin
    result = nothing
    for c = 1:length(WorkDetailArr[t].data)
        println("Thread $(t), Working with data = $(WorkDetailArr[t].data[c])")
        sleep((NumOfThreads - t + 1) * rand())
        FoundTreasureBool = rand() < 0.20
        if FoundTreasureBool
            Success[t] = true
            result = WorkDetailArr[t].data[c]
            break
        end
    end
    "Signal that this thread has completed"
    Completed[t] = true
    println("Thread $(t) completion has been signaled")
    return result
end

Workers = Task[]
for t = 1:NumOfThreads
    push!(Workers, Threads.@spawn WorkTask(t))
    println("Thread $(t) has been spawned")
end

# while Accepted contains at least one element which is false
treasurefound = false
while ! all(Accepted) && ! treasurefound
    for t = 1:length(Workers)
        if xor(Completed[t],Accepted[t])
            println("Thread $(t) completion has been accepted")
            "Mark that controller has accepted the thread completion"
            Accepted[t] = true
            "Check if the worker is successful in finding the treasure"
            if Success[t]
                treasure = Threads.fetch(Workers[t])
                println("Thread $(t) has found treasure! Its value is ",treasure)
                global treasurefound = true
                break
            else
                Threads.wait(Workers[t])
            end
        end
    end
    sleep(0.05) # Sleep for 50 milisecond
end
println("Done")
exit()

You need to use some kind of locking mechanism (lock/atomic) to mutate the global states like this. This program has data races and so its behavior is undefined.

(One may argue that released versions of julia do not define the memory model so that what we can’t talk about memory ordering anyway. But Julia devs are working on it.)

Besides that the behavior of program is undefined, it’s better to avoid polling like this.

ReduceIf etc. in Transducers.jl hides all these difficulties and provide easy-to-use interface. In general, I think it’s better to avoid @spawn if you can find higher-level abstractions elsewhere.

1 Like

AFAIK, this is still the best thread on this use case. For me, this is a pretty common scenario when parallel programming and I found this thread enlightening, but hard to parse. I’ve now returned to it with fresh eyes, to see if I can distill the essence.

I’m going to use @StevenSiew 's metaphor of treasure hunting to describe the general situation of performing a lengthy computation on each element of a domain (ie. set of input values), where you are only interested in finding any input value that produces a desirable output from the computation. In those cases it makes sense to put multiple cores to work, and then move on as soon as one thread finds a successful input.

Let’s use a toy example, and start with obvious multi-threaded version:

function is_treasure(val)
  sleep(0.1)
  return val % 23 == 0
end

function find_treasure(haystack)
  Threads.@threads for i in haystack
    if is_treasure(i)
      println("Found treasure: " * string(i))
    end
  end
end

@time find_treasure(1:100)

As you would expect, treasure is found out of order, and the entire domain is searched:

% julia --threads=auto treasure_hunt.jl
Found treasure: 92
Found treasure: 69
Found treasure: 46
Found treasure: 23
  1.495740 seconds (18.51 k allocations: 972.306 KiB, 0.46% compilation time)

The first suggestion in this thread to introduce early termination is with a synchronised flag for each thread to check. Atomic will do:

function find_treasure_atomic(haystack)
  found = Threads.Atomic{Bool}(false)

  Threads.@threads for i in haystack
    found[] && return
    
    if is_treasure(i)
      found[] = true
      println("Found treasure: " * string(i))
      return
    end
  end
end

@time find_treasure_atomic(1:100)

Typical results:

% julia --threads=auto treasure_hunt.jl
Found treasure: 92
Found treasure: 69
  0.627929 seconds (24.75 k allocations: 1.285 MiB, 3.23% compilation time)

So it helps, but has issues (courtesy of @tkf ):

  1. It relies on an undefined behavior of @threads; i.e., return terminates the basecase iteration.
  2. It introduces an atomic read stop[]. It may suppress some compiler optimization (even in x86). But I’m not super sure about this.
  3. It also doesn’t guarantee that threads will see the flag if they’re already busy doing work, as evidenced by the two “Found treasure” lines that sometimes appears in the output.

The second suggestion in this thread is to use the early termination functionality built into Transducers.jl. Bringing that advice up to date with current versions:

using Transducers

function find_treasure_transducer(haystack)
  foldxt(Map(identity), haystack, init = nothing) do _, i
    if is_treasure(i)
      println("Found treasure: " * string(i))
      return reduced(i)
    end
    return nothing
  end
end

@time find_treasure_transducer(1:100)

Which for me produces the perplexing output:

Found treasure: 69
Found treasure: 46
Found treasure: 92
Found treasure: 23
Found treasure: 23
Found treasure: 23
  1.408017 seconds (244.96 k allocations: 13.486 MiB, 4.33% compilation time)

Some of this result can probably be explained by the default chunk size basically ensuring that the domain is split evenly between all available cores. Once a thread finishes, there is no more input to trigger another thread to start, so there’s nothing early about early termination. I tried playing with basesize but didn’t make any progress.

Finally, the last suggestion in this thread is to DYI on top of @spawn instead of @threads for. I posit that if this is the best path, that an opportunity exists to close that gap.

So I guess in conclusion, I’m still without an easy and safe solution on par with the ease of @threads for, for what I consider a common use case. But for most of my purposes, I’ll take my risks with the atomic flag.

2 Likes

There has been this other recent thread: How to early return from inside a @threads loop

(I think the safest way to deal with that is to use a flag to exit from the workload of the chunk assigned to each thread, splitted at a higher level)

Oh nice! What a coincidence, thank you for pointing it out. Seems to still be a relevant topic.