Normally you are thought to avoid thread race in parallel programming, but in certain conditions you would want a sort of thread race.
I need to compute some expensive computation and return the function the first time I find some condition, doesn’t matter if this happens on the 2nd or the 200th element of the data:
test = rand(1:10,1000)
function foo(data)
Threads.@threads for i in 1:length(data)
# complex computation here
if data[i] == 10 # first thread that finds a specific condition "wins" and returns
return (i, data[i])
end
end
end
a = foo(test) # this doesn't work
Which is the “correct” programming pattern for these situations? Clearly not the one above
I’m not sure what “the correct™ programming pattern” would be, but I think something like this should work:
function findany(pred, data)
# Hand-written chunk definition, you can use ChunkSplitter.jl instead
nchunks = Threads.nthreads()
bounds = round.(Int, LinRange(firstindex(data), lastindex(data)+1, nchunks+1))
chunks = [bounds[i]:bounds[i+1]-1 for i in 1:nchunks]
# A Channel to synchronize tasks
c = Channel{Int}(1)
# One task per chunk
tasks = map(chunks) do chunk
Threads.@spawn begin
for i in chunk
# If the Channel is closed, some other task must have found something
# => return early
isopen(c) || break
if pred(data[i])
# If an element matching the predicate is found, try putting in in the Channel
try
put!(c, i)
catch e
# Gracefully handle cases when the Channel has been closed
# since the last time we checked
e isa InvalidStateException || rethrow()
end
break
end
end
end
end
# Wait for a result to be put into the Channel
idx = take!(c)
# ... and close it to notify all other tasks that they should stop looking
close(c)
# Wait for all tasks to terminate
foreach(wait, tasks)
return (idx, data[idx])
end
test = rand(1:10, 1000);
function pred(x) # an expensive predicate
sleep(0.1)
x == 10
end
findany(pred, test)
Note that this isn’t true: in Julia, like in most other languages, a data race is always a serious user error, see the “Data-race freedom” subsection in the Manual: Multi-Threading · The Julia Language
The thread name is not really describing the problem: It’s easy to make a data race that won’t solve the problem here. Maybe you will get better answers if you rename the thread to “How to return from inside a @threads loop” or something like that?
julia> using ChunkSplitters
function foo(data; nchunks=Threads.nthreads())
lk = ReentrantLock()
iwin, datawin = nothing, nothing
Threads.@threads for (i_range, _) in chunks(data, nchunks)
for i in i_range
if !isnothing(iwin)
break
end
# complex computation here
lock(lk) do
if isnothing(iwin)
if data[i] == 10
iwin, datawin = i, data[i]
end
end
end
end
end
return iwin, datawin
end
foo (generic function with 1 method)
julia> a = foo(test)
(29, 10)