Interrupt `Threads.@threads` computation

I sometime deal with very lengthy computation that I would like to interrupt before they finish.
I can achieve this by wrapping the function with @async and then throw an InterruptException() to the task when I want it to end.

Unfortunately, this does not seem to work if my function uses multithreading via Threads.@threads internally as I believe @threads spawn other tasks that keep going when I interrupt the originating task.

Is there a way to exploit multithreaded for with the possibility of killing the computation all-together?

2 Likes

For whoever might get in the same situation, I managed to hack my way out of it for a quick fix.

I was just trying some simple toy example like with the commands in this code

function testfunc()
	portion = zeros(Int,Threads.nthreads())
	Threads.@threads for i ∈ 1:400
		rand(1000,1000)*rand(1000,1000)
		portion[Threads.threadid()] += 1
		iter = sum(portion)
		mod(iter,10) === 0 && (println("iter = $iter, Thread = $(Threads.threadid())"); sleep(0.1))
	end
end

tt = @async testfunc()

schedule(tt, InterruptException();error=true)

Where you’ll see that after scheduling the interrupt exception the threads continue to print the status via println.

I had a look at the code of Threads.@threads and I could get thread interruptions by modifying sligthly the function at https://github.com/JuliaLang/julia/blob/d279aede19db29c5c31696fb213e3101e2230944/base/threadingconstructs.jl#L25-L43

so after overwriting that method with

@eval Threads function threading_run(func)
    ccall(:jl_enter_threaded_region, Cvoid, ())
    n = nthreads()
    tasks = Vector{Task}(undef, n)
    for i = 1:n
        t = Task(func)
        t.sticky = true
        ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, i-1)
        tasks[i] = t
        schedule(t)
    end
    try
        for i = 1:n
            wait(tasks[i])
        end
	catch ex
		if isa(ex, InterruptException)
			println("InterruptException received, stopping @threads tasks!")
			map(tasks) do t
				schedule(t,InterruptException();error = true)
			end
		end
    finally
        ccall(:jl_exit_threaded_region, Cvoid, ())
    end
end

to add the sub-thread interruption in the catch statement, the threads are properly shut down when sending an exception.

This is serious type piracy so I suppose I should create a separate macro with basically the Threads.@threads code copy pasted with the extra catch modification above in a real use case.

Is there any specific reason why the multithreaded call does not already try to shut down the threads it spawned when it is interrupted?

Edit: Had some more discussion on zulip (this thread) where some drawbacks of the method above are pointed out.

1 Like

I’m saddened by the fact that this obvious and painful issue has still no solution. It is crazy how I can’t terminate background threads without killing the whole REPL, thus losing precious data and computation results and having to start all over again.

A nice solution would be well appreciated.

4 Likes

I made this:

strategy_search(trials_db, instances, data)
	REQUESTS = length(instances)
	l = ReentrantLock()
	p = Progress(REQUESTS)

	stop_cond::Bool = false	
	try
		@threads for strategy in instances
			strategy in keys(trials_db) && continue
			stop_cond && (break)
			loss = eval_strategy(strategy, data)
			lock(l) do
				trials_db[strategy] = loss
				next!(p)
			end
		end
	catch e
		stop_cond = true
		println("We stop all threads with flag. $e")
		!(e isa InterruptException) && rethrow(e)
	end
end

I think it is pretty simple and ultimate.
The problem is that stop_cond is Core.Box type and I hate that. But I believe it shouldn’t make too big performance drawback?

trials_db is a simple dict that I load&save (persist) between runs: Dict{MyStrategy, Float32}()
instances are just generated trials, that has to be tested.

I think you should update stop_cond atomically, otherwise you have a data race.

The solution is to incorporate some point in your computation where it checks for a stopping condition, cleans up or invalidates the state, and then stops the computation.

3 Likes

You can make it atomic if you don’t want to do any more computation or avoid any dirty read.
For me it doesn’t matter if there is 4-30 more computation with dirty read on stop_cond as I am just interrupting the next 10 million computation which is the key. Also we mustn’t forget that the computations are whole so the results from each dirty read case is valid and are saved in the trials_db, so I can already work with ready computations perfectly.

have a look here for an @stoppable macro.

1 Like

There are other similar recent threads, for instance this one: How to early return from inside a @threads loop - #7 by lmiq

Personally, if the condition in the final thread is not an error, I would not use a try block. As in the example above, I prefer to control the tasks per thread and terminate them cleanly by using a flag (atomic or locked).

We are talking about InterruptException, so it is not error in that sense, just interrupt signal (CTRL + C), that can be handled with try-catch. So our cases are all about a very long unintended run that we want to stop manually without restarting the REPL.

1 Like

Should that macro (or other candidate?) have its own package (or be in Base)? It’s an obscure package, or I wouldn’t be looking there…

Yea it was something that I’ve just used there and in some of my packages that depended on it. I’m not sure where things like this would belong but it certainly is very useful. Maybe there should exist some curated general “Utils” package that could also be a good testing ground for things that could eventually be picked up into Base

I don’t know of a general Utils package where this would apply (there are many, but this seems to specific), except maybe at (though you would want to work for Base threads (then not the best fit there?), or both, but also maybe useful for you to know of this rather and use?):

Polyester.jl is a Julia package that provides low overhead threading. The primary API is @batch, which can be used in place of Threads.@threads.

From Bing (ChatGPT), it didn’t know of the above, until I asked it about it the right way (had to clarify by adding .jl to the package name), then also generated the above, both it and with sources:

ThreadsX.jl However, there are also packages that can be used to extend its functionality such as ThreadPools.jl and FLoops.jl. ThreadPools.jl exposes a few macros and functions that mimic Base.Threads.@threads, Base.map, and Base.foreach. FLoops.jl provides a simple GPU executor and @reduce syntax for supporting complex reduction in a forward-compatible manner.

I see it’s about a Task (coroutine) vs Thread (from my memory not interoperable). These are I think related, but do you know if either or both are based on pthreads:

https://man7.org/linux/man-pages/man3/pthread_cancel.3.html

and if have similar support to asynchronous and deferred explained there?

You’re totally right. As far as I understand it is that a task is kinda like a coroutine, and it can or can not be ran by different threads. I.e. if you do @task and schedule() it will run on the thread from which you called the schedule. If you call @spawn you tell julia to run it on whatever thread is available.

Actually looking at the @spawn definition is quite illuminating. It creates the task, sets where the task is allowed to run and then schedules the task.

I don’t know really whether it’s a pthread or jthread or anything like that but afaik you should not think of “stopping threads” rather stopping a task that’s running on a thread, the thread will go its merry way and run another task after interrupting it like above.

1 Like