Task affinity to threads

If I use multi threading - for example, using @spawn top() - and the body of top() also invokes either @async or even @spawn of some nested() work, and uses @sync() to wait for this work;

Then I assume that the Julia scheduler is free to run other tasks on the thread (e.g., while top() is waiting on a @sync). That is, top() may be paused and waiting for the scheduler to resume it.

Is it guaranteed that top() will only be resumed on the thread it started on? That is, if I write thread_id = Threads.threadid() in the first line of top(), can I safely @assert thread_id == Threads.threadid() in the last line of top()?

Or can the scheduler resume top() in another thread, so that the value of Threads.threadid() might change following a @sync (and possibly in other points), so such an assertion is not safe to make?

Edit: It seems that today the answer is this assertion is safe. The question then becomes whether this is guaranteed to hold in future versions of Julia, or whether it is possible for a future version scheduler to break this promise.

Interesting! If I write the following test for your problem like:

a = zeros(Int, nthreads(), nthreads())

function top(n=1000)
	x = zeros(Int, nthreads())
	thread_id = threadid()
	for j in 1:n
		@threads for i in 1:nthreads()
			x[threadid()] +=  threadid()
		end
	end
	@assert threadid() == thread_id "lost my thread $thread_id, now on $(threadid())"
	println("back from thread $(threadid())")
	a[:,threadid()] .= x
end

@threads for i in 1:nthreads()
	@async top()
end

and run it, it gives me

back from thread 4
back from thread 3
back from thread 2
back from thread 1
julia> a
4Ă—4 Array{Int64,2}:
 1000     0      0      0
 2000  8000      0      0
 3000     0  12000      0
 4000     0      0  16000

What does this mean?

  1. asynchronous tasks (top()) on different threads stay on their threads and
  2. on threads other than 1 you cannot create with @threads tasks on all parallel threads.

This maybe different for @spawn, but I guess not.

I tried a more aggressive approach:

using Base.Threads
using Random

MAX_DEPTH = 8

switched = zeros(Bool, 10)
depth_switched = zeros(Bool, MAX_DEPTH)
used_threads = zeros(Bool, nthreads())

SLEEP = 1
SPAWN = 2
WAIT = 3

function track(body, reason_index)
    thread_id = threadid()
    used_threads[thread_id] = true
    body()
    if thread_id != threadid()
        switched[reason_index] = true
    end
    return nothing
end

function recurse(thread_id, depth, rng)
    if thread_id != threadid()
        depth_switched[depth] = true
    end

    if depth == MAX_DEPTH
        track(SLEEP) do
            sleep(rand(rng, 1)[1] / 10)
        end
        return nothing
    end

    seed = rand(rng, Int, 1)[1]
    while seed < 1
        seed = rand(rng, Int, 1)[1]
    end

    result = nothing

    track(SPAWN) do
        thread_id = threadid()
        result = Threads.@spawn recurse(thread_id, depth + 1, MersenneTwister(seed))
    end

    recurse(threadid(), depth + 1, rng)

    track(WAIT) do
        wait(result)
    end
end

function affinity()
    recurse(threadid(), 1, MersenneTwister(123456))
    println("Used threads: $(used_threads)")
    println("Spawned switched threads? $(depth_switched)")
    println("Sleep switched threads? $(switched[SLEEP])")
    println("Spawn switched threads? $(switched[SPAWN])")
    println("Wait switched threads? $(switched[WAIT])")
end

And it printed:

Used threads: Bool[1, 1, 1, 1]
Spawned switched threads? Bool[0, 1, 1, 1, 1, 1, 1, 1]
Sleep switched threads? false
Spawn switched threads? false
Wait switched threads? false

So in my test, calling @spawn on any thread will invoke the task on other thread(s), which makes sense.

At the same time, each task seems to be fixed to the thread it started on, even if one calls @spawn, wait, or sleep. Which means the scheduler ensures this, at least today.

The question then becomes: is this guaranteed or can this change in future versions of Julia? That is, is it OK to write code which relies on this task-thread affinity property?

1 Like

Task migration is a build-time option, currently not the default and maybe not well-tested. If it is enabled, there is a sticky property to forbid it for particular tasks. The fact that this is not well-documented, and that the manual still says “[threading] interfaces may change in the future” (i.e. presumably even within the v1.x regime) suggests you shouldn’t rely on affinity for the simple @spawn.

But it is certainly a desirable feature, likely to be available in some form. If you implement algorithms that benefit from affinity, that builds pressure for making it an official part of the API.

2 Likes

Yes, this is a desirable feature since it allows safely using per-thread data structures (indexed by the threadid()) without locking. Of course, tThread migration is also desirable as it allows improved CPU utilization. Hopefully if/when it is introduced, there would be a way to specify whether a thread is “sticky” and for maximal backward compatibility the default should be true.

Thanks for the clarification!

Just note that the default task behavior was changed to non-sticky.
The safe way to make tasks “sticky” is to say “current_task().sticky = true” in the 1st line of the task function.

Task.sticky is not really meant to be part of the public API. Tasks are sticky by default:

julia> Task(()->println(5)).sticky
true

Threads.@spawn makes them not sticky.

julia> (Threads.@spawn println(5)).sticky
5
false

See the macro expansion of Threads.@spawn.

julia> @macroexpand Threads.@spawn println(5)
quote
    #= threadingconstructs.jl:343 =#
    let
        #= threadingconstructs.jl:344 =#
        local var"#9#task" = Base.Threads.Task((()->begin
                            #= threadingconstructs.jl:340 =#
                            println(5)
                        end))
        #= threadingconstructs.jl:345 =#
        (var"#9#task").sticky = false
        #= threadingconstructs.jl:346 =#
        ccall(:jl_set_task_threadpoolid, Base.Threads.Cint, (Base.Threads.Any, Base.Threads.Int8), var"#9#task", 0)
        #= threadingconstructs.jl:347 =#
        if $(Expr(:islocal, Symbol("##sync#48")))
            #= threadingconstructs.jl:348 =#
            Base.Threads.put!(var"##sync#48", var"#9#task")
        end
        #= threadingconstructs.jl:350 =#
        Base.Threads.schedule(var"#9#task")
        #= threadingconstructs.jl:351 =#
        var"#9#task"
    end
end

If you want a sticky Task, just use @async instead of Threads.@spawn.

julia> (@async println(5)).sticky
5
true

If you are looking for pinning to threads, see ThreadPinning.jl:

Thanks for the clarification.

Both @spawn and @threads used to create sticky tasks “back in the day”, and now they don’t - fair enough; I just updated this thread to clarify this for anyone who happens upon it and won’t understand why things don’t work as expected.

As for using @async - just changing @spawn to @async in the affinity.jl above fails to use any threads other than the 1st one, while using @spawn does use all threads (I run it with JULIA_NUM_THREADS=4). It seems @async isn’t exactly “@spawn w/ sticky task”. Looking at @macroexpand they seem very similar except that @spawn calls Base.Threads.put! and Base.Threads.schedule, while @async calls Base.put! and Base.schedule. Perhaps you could shed some light on this? Also, is there an equivalent to @threads that creates sticky tasks?

Finally, task stickiness has nothing to do with thread pinning. Task stickiness is about whether a task can run on multiple threads while thread pinning is about whether a thread can run on different logical processors (“cores”).

@threads :static for ...

Correct, it is not. If you want a sticky @spawn see @tspawnat provided by ThreadPools.jl or ThreadPinning.jl.

Thanks! Using @threads :static and @tspawnat does help. Though one does wonder why @spawn :static isn’t a thing…