[ANN] ThreadPools.jl - Improved thread management for background and nonuniform tasks

Update on the setup overhead above. That is only when being called in a script from global. Wrap the same script in a main() function, and the overhead goes away. Lesson learned.

Figure_1

1 Like

@tro3, very cool stuff. One thing I find myself needing is essentially Threads.@bgspawn expr, i.e. run this task on any thread, as long as itā€™s not the primary thread. How hard would it be to add something like that? Any pointers? Iā€™m willing to take a stab at it.

Do you need it to be the next available thread, or any old thread? The issue is that it is difficult to do the former and avoid the primary, because the lower-level assignment mechanism canā€™t query the id before assignment. If one tries to write the code to resubmit after assignment to the primary, there might be a lot of cycles spent on constantly reassigning out of the primary until a ā€œgoodā€ one hits.

If any old thread will do, then assigning to a random thread would be easy enough. But in a heavy loading case, that thread might not get to the task for while.

Which of the above sounds like your use case?

Any old thread is fine

Why not just @tspawnat 2 expr then (always sending to thread 2)? Or do you have a bunch of jobs and want to spread them out?

Hereā€™s what I came up with by modifying the @tspawnat macro:

macro bgspawn(expr)
    thunk = esc(:(()->($expr)))
    var = esc(Base.sync_varname)
    quote
        local task = Task($thunk)
        task.sticky = false
        tid = rand(2:Threads.nthreads())
        ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, tid-1)
        if $(Expr(:isdefined, var))
            push!($var, task)
        end
        schedule(task)
        task
    end
end
2 Likes

Maybe rand(2:Threads.nthreads()) works fine in practice. But I guess another possibility is to roll your own background worker pool?

julia> bgpool() = Channel(Inf) do request
           workers = Task[]
           for tid in 2:Threads.nthreads()
               task = @task begin
                   for (f, response) in request
                       push!(response, @async f())
                   end
               end
               task.sticky = false
               ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, tid-1)
               schedule(task)
               push!(workers, task)
           end
           Base.sync_end(workers)
       end
bgpool (generic function with 1 method)

julia> function bgspawn(f, request)
           response = Channel(1)
           try
               push!(request, (f, response))
               return take!(response)
           finally
               close(response)
           end
       end
bgspawn (generic function with 1 method)

julia> pool = bgpool()
Channel{Any}(sz_max:9223372036854775807,sz_curr:0)

julia> t = bgspawn(pool) do
           Threads.threadid()
       end
Task (runnable) @0x00007ffa541acc40

julia> fetch(t)
2
2 Likes

Interesting. Is sync_end a 1.4 thing?

I think Base.sync_end has been there for a long time (I can at least find it in 0.6). But Itā€™s an internal function which I shouldnā€™t have used :sweat_smile:.

I donā€™t know a good way to do this without using it (or without relying another internal functions), though. If I were to do this without using any internals, Iā€™d replace Base.sync_end(workers) with

@sync for t in workers
    @async wait(t)
end

Also I guess this ā€œsyncā€ part does not matter much as Iā€™m not doing enough error handling. I probably should do something like

task = @task try
    for (f, response) in request
        push!(response, @async f())
    end
catch
    close(request)
    rethrow()
end
1 Like

Ok, having done a more thorough dive into the ThreadPools code, I think what I actually want is:

using ThreadPools

pool = ThreadPools.QueuePool(2)

put!(pool) do
    println(Threads.threadid())
    # do all my cool stuff I need
end

So thatā€™s great! I like QueuePool because it will take care of non-uniformity, which is a potential concern in my case; that is, my earlier @bgspawn just picked a random thread, and thereā€™s potential that the same thread gets picked twice and the first task takes a long time and the 2nd task is stuck queued behind it. Furthermore, in my application scenario, I donā€™t expect all threads to be busy all the time, so making sure work is taken by non-busy threads is important as there usually will be at least one.

Anyway, very cool stuff @tro3; ThreadPools.jl is really neat.

2 Likes

Excellent! Glad to hear itā€™s useful. :slight_smile: