I had the problem, that I wanted to generate a certain object from random seeds, which only with a very low probability gives me the result I want. Too speed the calculation up I wanted to parallelize the code, but I only need one of the rare events, so I thought channels would be what I want. Then I can create a channel, try to fill it will many threads and as soon as one thread has found a solution stop the other threads and start the rest of my simulation. However, I didn’t find a way to kill the other threads so I reduced the code, to just find one rare event and then exit. What happened instead was, that the code did not exit after the first rare event was found, but randomly after a few were found.
MWE
I reduced my code down to the following MWE. I was expecting the code to find one rare event and then exit. So it should print takingfound one then done and then exit the julia-shell. When I run the code this sometimes happens, but sometimes it also prints multiple found one then multiple done and only afterwards exits, with long delays between the different found one.
I tried changing the exit() to return in the code below and the behavior is the same except that I get the final done taking printed to.
The behavior is also independent on the size of the channel.
I also tried pinning threads to specific CPU cores, which didn’t change the behavior.
I noticed that I get the “not exiting” behavior more frequently when the CPU was busy and less frequently when idling.
# glibc_coreid() = @ccall sched_getcpu()::Cint # pinning to specific cores doesn't change behavior
function func_busy(i)
# println("task $i running on core $(glibc_coreid())")
while (true)
r = rand()
if r < 1e-10
println("found one")
# I was originally writing ~20kB to disk here instead of printing "done" and flushing.
println("done")
flush(Base.stdout)
# return r # code independent of return or exit
exit()
end
end
end
function func_idle(t)
println("sleeping for $t s")
sleep(t)
println("found one")
println("done")
flush(Base.stdout)
# return t
exit()
end
function threaded_f(f)
# independent on the buffer size of the channel
c = Channel(1)
# c = Channel()
# c = Channel(Threads.nthreads())
th_func(i) = put!(c, f(i))
tasks = [Threads.@spawn th_func(i) for i in 1:(Threads.nthreads()-1)]
# tasks = [ th_func() for _ in 1:(Threads.nthreads()-1)] # when running single threaded the code works as expected and exits after the first call
println("taking")
# independent on fetching or taking
res = fetch(c)
# res = take!(c)
println(res)
println("done taking")
# close(c) # closing the channel doesn't change anything
# exit() # this exit also doesn't change anything
end
println(Threads.nthreads())
threaded_f((i)->func_busy(i)) # weird behavior occurs more often when CPU is busy, instead of idling
# threaded_f((i)->func_idle(i))
So my questions are:
is there a better way to solve the problem of waiting for one thread to finish its task and kill the others once it’s done.
why am I getting these weird race conditions? There is a multiple second delay between the different found one prints to the console. The program should be able to exit before finding new rare events
Threads need to stopped cooperatively: Each worker periodically checks whether is should give up. You do that via e.g. done_flag = Threads.Atomic{Bool}(false), and then sprinkling done_flag[] && throw() through your code. (you can also return normally instead of throwing an exception)
Threads cannot reasonably be killed, because it’s generally not possible to recover to a known state. Processes can be killed that way, because they are somewhat sandboxed.
The program should be able to exit before finding new rare events
You calling exit() politely tries to shutdown the process. That involves calling various atexit hooks, cleaning up temporary files, etc. It may involve waiting for all threads to reach some kind of safepoint.
I’m not entirely sure where julia falls on the “ensure that the exit happens fast” vs “run cleanup before leaving” with exit, but on unixy systems you can always
You could also have the worker that puts to the channel close it immediately afterwards. So everyone that tries to put will throw afterwards, but you can still take what was put by the first worker.
Thank you for your suggestions.
I think I need to rephrase my second question slightly differently:
What I find most confusing is that the threads that found the rare event, would print found one but then didn’t do the next statement in the function. I.e. in the code above print done or in my original code writing ~20kB to disk. On average it took three threads to find these rare events for one to write to disk. Usually there were minutes between finding the events. How can this happen and how can I prevent it?
Using println in threads can be a bit confusing. The reason is that the IO is actually done by thread 0 when it has the time. To ensure immediate output you can do
Note the ; in printf, it’s a vararg function and needs this accurate specification of inputs and output to work properly.
And, as noted above, an exit() will terminate the process, you should probably instead do a return to terminate the task, and do a synchronous shutdown of the other tasks.
This then probably also applies to other IO, like the writing to disk that I had there originally instead before the second print statement?
I tried return statements instead of the exit statement, but got the same behavior. Sometimes I got a found one then multiple minutes of nothing happening (other thread still searching) and then other threads finding their rare events and printing found one. Only after around 3 hits, did one thread actually return and the function threaded_f print the result/done taking.
I think something like this works fairly well. The other tasks will continue until the test in the while loop is encountered. But it’s possible to insert some done[] && return inside the loop as well.
function func_busy(i, done)
while !done[]
r = rand()
if r < 1e-10
done[] = true
@ccall printf("found one %.3le\n"::Cstring; r::Cdouble)::Cint
Libc.flush_cstdio()
return r
end
end
return nothing
end
function threaded_f(f)
c = Channel(1)
done = Threads.Atomic{Bool}(false)
tasks = [Threads.@spawn begin
local r = f(i, done)
isnothing(r) || put!(c, r)
end for i in 1:(Threads.nthreads()-1)]
println("taking")
res = take!(c)::Float64
println("result: ", res)
println("waiting for all tasks")
wait.(tasks)
close(c)
println("all done")
end
println(Threads.nthreads())
threaded_f(func_busy)
There is a potential problem with the above. The Channel has a capacity of 1. This means that if a number of the tasks find a solution simultaneously, they will all write their result to the channel. But the channel is read only once, so they will wait until there’s room in the channel, and the wait will wait forever. This can be alleviated by making the channel capacity larger.
Or, better, you can replace the test if r < 1e-10 in func_busy with something like
if r < 1e-10 && !Threads.atomic_or!(done, true)
return r
end
atomic_or! returns the previous value, and updates it atomically, so if done was already true the test will fail, and the return r will not be done.
Note a little detail here which is important. I declare r as local. The reason is that if you do something like this, havoc ensues:
tasks = [Threads.@spawn begin
res = f(i, done)
isnothing(res) || put!(c, res)
end for i in 1:(Threads.nthreads()-1)]
println("taking")
res = take!(c)::Float64
The variable res is assigned to outside the @spawn so the compiler will assume the same variable is used inside @spawn, so all the parallel tasks will share the variable res. (This is exactly what happens intentionally with the channel c). So the res may change value between isnothing(res) and put!(c, res) (and even after the res = take!(...)), because another task happened to update it. For this reason, it’s good practice to always declare new variables inside such constructions as local to ensure it’s not accidentally shared with any outer scope.
Thank you! This solves my problem.
However I’m still wondering, why in my original code the behavior of printing multiple found one minutes apart was happening. Do you have an explanation why this might happen?
The call to exit() isn’t just a simple libc exit. It tries to synchronize the threads to ensure, I don’t know, flushing output buffers which requires cooperation with the garbage collector etc. But the other threads are cpu-bound, and are not cooperating. At least not until they reach a println or some other julia runtime stuff.
I’m not sure exactly what happens, it’s somewhere inside here, the jl_atexit_hook C-routine:
I see that exit in julia does more things than in C.
However, from my observations the threads didn’t hang at the exit() statement, but after the first println("found one"), which I find very weird. I claim this because I saw multiple found one printed before a single done. This behavior was independent of whether there was a flush statement or not, and also independent of whether I had a return or exit() at the end of the function.