Stop threads after a certain condition is true

I want to to run n threads in parallel and if one of the threads reaches cost 0 I would like to stop all the others.

How can I do this?


best_expression, best_program, program_cost = nothing, start_program, Inf64


lk = SpinLock()
thread_list = [Threads.@spawn parallelWork(meta, index) for (index, meta) ∈ enumerate(meta_search_list) ]

println(length(thread_list))

function parallelWork(meta,index)
    # this function will take a long time to run and will give me the "cost" of a program
    expression, outcome_program, cost = generic_run(meta..., start_program = start_program)
    lock(lk) do 
       # if a cost is smaller than the "best" cost I update
        if cost < program_cost
            best_expression, best_program, program_cost = expression, outcome_program, cost
        end
        
        if cost == 0  
           # I try to stop all other threads that are not the current thread..
            for (thread_index,thread_task) ∈ enumerate(thread_list)
                if thread_index != index 
                    println("Stop $thread_index because $index reached cost 0")
                    schedule(thread_task, ErrorException("stop"), error=true)
                end
            end
        end
    end
end

wait.(thread_list)

Unfortunately, this code does not work (REPL lags so much that everything kind of crashes and my CPU is 100%) and I have no idea how do I stop all other threads when one of them is done.

In pthreads there is pthread_cancel that would cancel the thread. Is there something similar in julia too?

Thank you

Looking here: How to kill thread?
I think this could maybe work, but is ugly.

using .Threads

function hanging()
    while true
        println("I'm hanging on thread:", threadid())
        sleep(1)
    end
end

function wrapper_run()
    try
        hang = Threads.@spawn hanging();
        sleep(4)
        schedule(hang, ErrorException("stop"), error=true)
    catch e 
        println("Good")
    end 
    println("hello")
end

wrapper_run()

This will stop the thread after 4 seconds but not crash

You could use a Channel for synchronization.

2 Likes

One method could be to add a callback parameter to generic_run and run the callback function in a subloop of generic_run. This callback will check the current_best and return a boolean determining if generic_run should exit (there is even no need for a lock on the current_best). Any longer generic_run process will usually have a loop to embed this, and the callback will probably serve some other purposes such as logging or progress reporting.
I think in the long-run, a more boring solution such as this could provide the most flexibility and reduce the bulk of the compute costs.

How do I use it?
Can you provide me an example?

Thank you.

I don’t understand your solution. My issue is that I am running let’s say 10 threads in parallel and the generic_run function can take like 30 or more seconds.

If one of the threads manages to get to cost = 0 I would like to stop the others immediately. I don’t see how the callback would stop the other threads.

The callback is called many times during the 30 seconds or more of generic_run. Essentially the callback is polling the minimum_cost variable and when it is 0 (or some other value below the attainable in a thread) it returns false, otherwise returns true. In generic_run you exit if a false is returned.

I’ll try to give some pseudocode:

function generic_run(params; callback::F) where F<:Function
    init = some_f(params)
    while not_finished # loops many times
        callback() || return Inf64
        .... do heavy stuff ...
        .... to calculate cost ....
    end
    return cost
end

const minimum_cost = Base.RefValue(Inf64)

function cb()
    return minimum_cost[] == 0 ? false : true
end

function main()
    Threads.@threads for i in 1:1000
        c = generic_run(i, otherparams; callback = cb)
        if c < minimum_cost[]
            minimum_cost[] = c
        end
    end
end
1 Like

Ah I see, but this is not nice at all because I have to add the callback parameter to many functions.

It would be so nice if there would be a function to simply kill a thread immediately.

Yes, I’ve noted this is not a beautiful solution, but later on, you may find this callback to be useful in other ways. Especially thinking of progress reports and logging, which often creeps in long calculations.

Also, you would want the minimum_cost to be defined Atomic:

minimum_cost = Atomic{Float64}(Inf64)

and updated atomically:

atomic_min!(minimum_cost, new_cost)

otherwise all sorts of race conditions might happen. Checking for zero like so:

minimum_cost[] == 0.0 && return false
1 Like

I have a spin lock for that. Are atomic operations faster in this case?

Not perfect but you can get some inspiration here: JuliaUCL24/notebooks/backup/channels.ipynb at main · carstenbauer/JuliaUCL24 · GitHub

1 Like

Yes. Many atomic operations are really single instructions in modern CPUs or just a bit more.

This is really messy because it is unclear where in your code is good place to stop. You might want to clean up in some way before exiting the thread.

Here’s a crude example that uses the do syntax to create an anonymous function.

julia> ch = Channel{Bool}(spawn = true) do ch
           while isopen(ch) && isempty(ch)
               println("Hello World!")
               sleep(5)
           end
           take!(ch)
           println("Task finished")
       end
Hello World!
Channel{Bool}(0) (empty)

julia> Hello World!
Hello World!
Hello World!
julia> 

julia> put!(ch, 1)
Task finished
true

If you have an existing function, you could do the following.

julia> function intensive_calculations(stop_channel::Channel{Bool})
           s = 0
           for i in 1:1000000
               s += i^3
               if i % 10 == 0 && !isempty(stop_channel)
                   take!(stop_channel)
                   println("Breaking at $i")
                   break
               end
               sleep(0.001)
           end
           println("Done! s: $s")
       end
intensive_calculations (generic function with 2 methods)

julia> taskref = Ref{Task}()
Base.RefValue{Task}(#undef)

julia> ch = Channel{Bool}(intensive_calculations; spawn = true, taskref)
Channel{Bool}(0) (empty)

julia> put!(ch, true)
Breaking at 1580
Done! s: 1559976020100
true
1 Like

Is it always necessary to add sleep inside the intensive computation function?

No. I added sleep to try to simulate a compute intensive function.

1 Like

I’m recently published Visor.jl that is a task supervisor that
aims to reduce the boilerplate for synchronizing and managing the fault-tolerance aspects of long-running tasks.

Your use case was no covered, but it seems quite generic, so I’ve added a new supervisor strategy :one_terminate_all that terminate all tasks as soon as the first one terminate.

The following example show the case of 10 threads (note the option thread=true on process declaration) each executing a long-running job that take a random amount of time to finish.

The first thread that terminate wins the competition and causes the others to be immediately interrupted by the supervisor (note the option force_interrupt_after=0)

using Dates
using Visor

function intensive_calc(pd, version)
    try
        rtime = rand() * 5
        @info "[$(now())][$pd] working for $rtime secs"
        sleep(rtime)
        @info "[$(now())][$pd] terminated normally: $version is the winner"
    catch e
        @info "[$(now())][$pd] terminated $e"
    end
end

processes_list = [process("intensive_$i",
                          intensive_calc,
                          args=("v$i.0.0",),
                          thread=true,
                          force_interrupt_after=0) for i in 1:10]

supervise(processes_list, strategy=:one_terminate_all)

For completeness the following is an example that terminate the tasks using a safepoint instead of
being abruptly interrupted by the supervisor.

function intensive_calc_safe(pd, version)
    try
        rtime = rand() * 5
        @info "[$(now())][$pd] working for $rtime secs"
        dt = 0
        dsleep = 0.1
        while dt < rtime
            # simulate an intensive work
            sleep(dsleep)
            dt += dsleep

            # safepoint
            if isshutdown(pd)
                @info "[$(now())][$pd] terminated by shutdown"
                return
            end
        end
        @info "[$(now())][$pd] terminated normally: $version is the winner"
    catch e
        @info "[$(now())][$pd] terminated $e"
    end
end

processes_list = [process("intensive_$i",
                          intensive_calc_safe,
                          args=("v$i.0.0",),
                          thread=true) for i in 1:10]

supervise(processes_list, strategy=:one_terminate_all)
1 Like

Hi @attdona. Thank you for changing your library to add my use case in it.

How does your library actually implement this functionality under the hood?
Are you using a Channel that is polled constantly to check if a thread should exit?

By inspecting the code I see that you are using a message queue and then checking if there is a shutdown message there.

    try
        for msg in supervisor.inbox
            @debug "[$supervisor] recv: $msg"
            if isa(msg, Shutdown)
                supervisor_shutdown(supervisor, nothing, msg.reset)
                break
    # rest of code...

Isn’t this approach a bit more inefficient than using Channels as mkitti proposed?

1 Like

The issue is that atomic_min! is a write operation that needs to take ownership of the cache-line. Many cores hitting the same cache-line with atomic_min! will absolutely hammer it.

The easiest way would be to have stopCondition = Threads.Atomic{Bool}(false) and then check it in each iteration.

If you need the current known global minimum, in order to speed up e.g. branch-and-bound algorithms, then you can do

globalMin::Threads.Atomic{UInt64}
for candidate in candidates_in_this_task
  globalMin[] == 0 && break
  result = compute_stuff(candidate, globalMin[])
  if result < globalMin[]
    atomic_min!(globalMin, result)
  end
end

It is illustrative to compare

julia> atomic_variant(globMin, m) = begin Threads.atomic_min!(globMin, m); 0 end
atomic_variant (generic function with 1 method)
julia> @code_native atomic_variant(Threads.Atomic{Int}(0), 0)
	.text
	.file	"atomic_variant"
	.globl	julia_atomic_variant_258        # -- Begin function julia_atomic_variant_258
	.p2align	4, 0x90
	.type	julia_atomic_variant_258,@function
julia_atomic_variant_258:               # @julia_atomic_variant_258
; ┌ @ REPL[29]:1 within `atomic_variant`
	.cfi_startproc
# %bb.0:                                # %top
	pushq	%rbp
	.cfi_def_cfa_offset 16
	.cfi_offset %rbp, -16
	movq	%rsp, %rbp
	.cfi_def_cfa_register %rbp
; │┌ @ atomics.jl:405 within `atomic_min!`
	movq	(%rdi), %rax
	.p2align	4, 0x90
.LBB0_1:                                # %atomicrmw.start
                                        # =>This Inner Loop Header: Depth=1
	cmpq	%rsi, %rax
	movq	%rsi, %rcx
	cmovleq	%rax, %rcx
	lock		cmpxchgq	%rcx, (%rdi)
	jne	.LBB0_1
# %bb.2:                                # %atomicrmw.end
; │└
	xorl	%eax, %eax
	popq	%rbp
	.cfi_def_cfa %rsp, 8
	retq
.Lfunc_end0:
	.size	julia_atomic_variant_258, .Lfunc_end0-julia_atomic_variant_258
	.cfi_endproc
; └
                                        # -- End function
	.section	".note.GNU-stack","",@progbits


julia> speculative_variant(globMin, m) = begin m < globMin[] && Threads.atomic_min!(globMin, m); 0 end
speculative_variant (generic function with 1 method)

julia> @code_native speculative_variant(Threads.Atomic{Int}(0), 0)
	.text
	.file	"speculative_variant"
	.globl	julia_speculative_variant_261   # -- Begin function julia_speculative_variant_261
	.p2align	4, 0x90
	.type	julia_speculative_variant_261,@function
julia_speculative_variant_261:          # @julia_speculative_variant_261
; ┌ @ REPL[30]:1 within `speculative_variant`
	.cfi_startproc
# %bb.0:                                # %top
	pushq	%rbp
	.cfi_def_cfa_offset 16
	.cfi_offset %rbp, -16
	movq	%rsp, %rbp
	.cfi_def_cfa_register %rbp
; │┌ @ atomics.jl:358 within `getindex`
	movq	(%rdi), %rax
; │└
; │┌ @ int.jl:83 within `<`
	cmpq	%rsi, %rax
; │└
	jle	.LBB0_3
# %bb.1:                                # %L8
; │┌ @ atomics.jl:405 within `atomic_min!`
	movq	(%rdi), %rax
	.p2align	4, 0x90
.LBB0_2:                                # %atomicrmw.start
                                        # =>This Inner Loop Header: Depth=1
	cmpq	%rsi, %rax
	movq	%rsi, %rcx
	cmovleq	%rax, %rcx
	lock		cmpxchgq	%rcx, (%rdi)
	jne	.LBB0_2
.LBB0_3:                                # %L13
; │└
	xorl	%eax, %eax
	popq	%rbp
	.cfi_def_cfa %rsp, 8
	retq
.Lfunc_end0:
	.size	julia_speculative_variant_261, .Lfunc_end0-julia_speculative_variant_261
	.cfi_endproc
; └
                                        # -- End function
	.section	".note.GNU-stack","",@progbits

The speculative variant is only correct because your global_min is non-increasing. But it is much faster because it doesn’t cause contention in the typical case where the CPU correctly speculates that you don’t have to change it.

(even if you don’t have to decrease globMin, you still cause cache contention if your CPU branch-predicts a decrease. Spectre FTW!)

PS. The above is for x86. I am not sure about performance implications of the atomic load on arm / powerpc. You might need more tricks there. Sorry, I’m really not up-to-date with atomics perf on arm.

2 Likes

Visor implements the supervisor concept:

a supervisor is an entity with an associated async task (the function manage) that wait for control messages.

The manage task does not poll the inbox Channel but blocks waiting for such messages. If the message is a Shutdown than terminate the supervised tasks.

In this respect there are not performance issues.

I agree with @foobar_lv2 here. If you really want to squeeze out the cycles you should avoid synchronization in loops, if possible. Locks are slow, but atomics also require some form of memory/cache synchronization which may take tens or hundreds of cpu cycles, depending on cpu/memory/cache architecture.

The fastest is to let the parallel tasks work independently, and do the global reduction (to the minimum or whatever) at the end, or collect all the results and reduce after everything has been fetched. For the premature termination, I also agree with @foobar_lv2. If possible, create an Atomic{Bool} or similar which is visible in all the tasks, set it to true when it’s time to stop. Check it regularly, not necessarily in every iteration, because there can be some overhead even with an atomic read (that’s precisely why not every operation is atomic).

Of course, if the actual work takes millions of cycles, an atomic or lock is neglible and can be done in every iteration.

1 Like