How to early return from inside a @threads loop

Normally you are thought to avoid thread race in parallel programming, but in certain conditions you would want a sort of thread race.
I need to compute some expensive computation and return the function the first time I find some condition, doesn’t matter if this happens on the 2nd or the 200th element of the data:

test = rand(1:10,1000)
function foo(data)
    Threads.@threads for i in 1:length(data)
        # complex computation here
        if data[i] == 10  # first thread that finds a specific condition "wins" and returns
            return (i, data[i])
        end
    end
end
a = foo(test) # this doesn't work

Which is the “correct” programming pattern for these situations? Clearly not the one above :slight_smile:

I’m not sure what “the correct™ programming pattern” would be, but I think something like this should work:

function findany(pred, data)
    # Hand-written chunk definition, you can use ChunkSplitter.jl instead
    nchunks = Threads.nthreads()
    bounds = round.(Int, LinRange(firstindex(data), lastindex(data)+1, nchunks+1))
    chunks = [bounds[i]:bounds[i+1]-1 for i in 1:nchunks]

    # A Channel to synchronize tasks
    c = Channel{Int}(1)

    # One task per chunk
    tasks = map(chunks) do chunk
        Threads.@spawn begin
            for i in chunk
                # If the Channel is closed, some other task must have found something
                # => return early
                isopen(c) || break

                if pred(data[i])
                    # If an element matching the predicate is found, try putting in in the Channel
                    try
                        put!(c, i)
                    catch e
                        # Gracefully handle cases when the Channel has been closed
                        # since the last time we checked
                        e isa InvalidStateException || rethrow()
                    end
                    break
                end
            end
        end
    end

    # Wait for a result to be put into the Channel
    idx = take!(c)
    # ... and close it to notify all other tasks that they should stop looking
    close(c)

    # Wait for all tasks to terminate
    foreach(wait, tasks)

    return (idx, data[idx])
end

test = rand(1:10, 1000);
function pred(x) # an expensive predicate
    sleep(0.1)
    x == 10
end
findany(pred, test)
1 Like

Thanks™.

See also this previous …thread

Note that this isn’t true: in Julia, like in most other languages, a data race is always a serious user error, see the “Data-race freedom” subsection in the Manual: Multi-Threading · The Julia Language

yes, sure… this is why I used “a sort of”… but yes, the word is not fortunate…

The thread name is not really describing the problem: It’s easy to make a data race that won’t solve the problem here. Maybe you will get better answers if you rename the thread to “How to return from inside a @threads loop” or something like that?

Another way, which is safe, is to use a lock:

julia> using ChunkSplitters
       function foo(data; nchunks=Threads.nthreads())
           lk = ReentrantLock()
           iwin, datawin = nothing, nothing
           Threads.@threads for (i_range, _) in chunks(data, nchunks)
               for i in i_range
                   if !isnothing(iwin)
                       break
                   end
                   # complex computation here
                   lock(lk) do 
                       if isnothing(iwin)
                           if data[i] == 10
                               iwin, datawin = i, data[i]
                           end
                       end
                   end
               end
           end
           return iwin, datawin
       end
foo (generic function with 1 method)

julia> a = foo(test)
(29, 10)