Update on the setup overhead above. That is only when being called in a script from global. Wrap the same script in a main() function, and the overhead goes away. Lesson learned.
@tro3, very cool stuff. One thing I find myself needing is essentially Threads.@bgspawn expr
, i.e. run this task on any thread, as long as itās not the primary thread. How hard would it be to add something like that? Any pointers? Iām willing to take a stab at it.
Do you need it to be the next available thread, or any old thread? The issue is that it is difficult to do the former and avoid the primary, because the lower-level assignment mechanism canāt query the id before assignment. If one tries to write the code to resubmit after assignment to the primary, there might be a lot of cycles spent on constantly reassigning out of the primary until a āgoodā one hits.
If any old thread will do, then assigning to a random thread would be easy enough. But in a heavy loading case, that thread might not get to the task for while.
Which of the above sounds like your use case?
Any old thread is fine
Why not just @tspawnat 2 expr
then (always sending to thread 2)? Or do you have a bunch of jobs and want to spread them out?
Hereās what I came up with by modifying the @tspawnat
macro:
macro bgspawn(expr)
thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
local task = Task($thunk)
task.sticky = false
tid = rand(2:Threads.nthreads())
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, tid-1)
if $(Expr(:isdefined, var))
push!($var, task)
end
schedule(task)
task
end
end
Maybe rand(2:Threads.nthreads())
works fine in practice. But I guess another possibility is to roll your own background worker pool?
julia> bgpool() = Channel(Inf) do request
workers = Task[]
for tid in 2:Threads.nthreads()
task = @task begin
for (f, response) in request
push!(response, @async f())
end
end
task.sticky = false
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, tid-1)
schedule(task)
push!(workers, task)
end
Base.sync_end(workers)
end
bgpool (generic function with 1 method)
julia> function bgspawn(f, request)
response = Channel(1)
try
push!(request, (f, response))
return take!(response)
finally
close(response)
end
end
bgspawn (generic function with 1 method)
julia> pool = bgpool()
Channel{Any}(sz_max:9223372036854775807,sz_curr:0)
julia> t = bgspawn(pool) do
Threads.threadid()
end
Task (runnable) @0x00007ffa541acc40
julia> fetch(t)
2
Interesting. Is sync_end
a 1.4 thing?
I think Base.sync_end
has been there for a long time (I can at least find it in 0.6). But Itās an internal function which I shouldnāt have used .
I donāt know a good way to do this without using it (or without relying another internal functions), though. If I were to do this without using any internals, Iād replace Base.sync_end(workers)
with
@sync for t in workers
@async wait(t)
end
Also I guess this āsyncā part does not matter much as Iām not doing enough error handling. I probably should do something like
task = @task try
for (f, response) in request
push!(response, @async f())
end
catch
close(request)
rethrow()
end
Ok, having done a more thorough dive into the ThreadPools code, I think what I actually want is:
using ThreadPools
pool = ThreadPools.QueuePool(2)
put!(pool) do
println(Threads.threadid())
# do all my cool stuff I need
end
So thatās great! I like QueuePool
because it will take care of non-uniformity, which is a potential concern in my case; that is, my earlier @bgspawn
just picked a random thread, and thereās potential that the same thread gets picked twice and the first task takes a long time and the 2nd task is stuck queued behind it. Furthermore, in my application scenario, I donāt expect all threads to be busy all the time, so making sure work is taken by non-busy threads is important as there usually will be at least one.
Anyway, very cool stuff @tro3; ThreadPools.jl is really neat.
Excellent! Glad to hear itās useful.