I am looking for code that can solve the following (multi-core, shared
memory) parallelisation problem, which is surely so common that a well
implemented solution must exist out there.
I have N cores.
I have a list (vector) of length M of procedure calls.
each call is thread safe with respect to all the others
the time required to complete a procedure call is hugely variable
(a very rough idea of how long a call might take is available, if
useful - at the level of ‘big’, ‘medium’, ‘small’, but no more than that).
M >> N
I would like my N cores to work through the M procedure calls as
efficiently as possible.
I suspect that the most efficient way to do this is with N persistent
threads (to avoid the overhead of starting a new thread for each
procedure call) but I am quite possibly wrong, and the costs of
starting a new thread for each procedure call are essentially immaterial.
Either way, the code should work (‘thread’) through all the calls as
fast as possible, using all cores.
Anyway, I assume this is a standard problem, so there should be
standard code out there. Anybody got good suggestions?
Have you tried the naive approach, i.e. starting Julia with -t N and then Threads.@spawn ing M tasks? (Threads.@spawn does load balancing.)
For more fine grained control you can take a look at ThreadPools.jl.
(You might also consider using pmap which, however, uses different worker processes instead of threads. So you need to use @everywhere and such to distribute your code etc.)
Julia pools threads behind the scenes.
So you don’t have to worry about it.
All threads start at the start of the Julia processes. Threads.@spawn just schedules the work onto a queue which all the threads can access to get more work from when they finish their current task.
julia> Base.remove_linenums!(@macroexpand Threads.@spawn sin(1))
quote
let
local var"#28#task" = Base.Threads.Task((()->begin
sin(1)
end))
(var"#28#task").sticky = false
if $(Expr(:islocal, Symbol("##sync#41")))
Base.Threads.put!(var"##sync#41", var"#28#task")
end
Base.Threads.schedule(var"#28#task")
var"#28#task"
end
end
Note that Threads.schedule == schedule
The docs for schedule says it adds the the scheduler’s queue.
help?> Threads.schedule
schedule(t::Task, [val]; error=false)
Add a Task to the scheduler's queue. This causes the task to run constantly when the
system is otherwise idle, unless the task performs a blocking operation such as wait.
Which sounds like it adds it to a queue.
The source code for schedule looks like it is queuing up work
@Sean_Matthews - this use case is exactly why I created ThreadPools.jl. The queued scheduler in that package is aimed directly at jobs of wildly varying lengths. (I had the same problem.) You may also want to consider keeping the primary thread available for scheduling. If a big job hits the primary, no new ones can start. The best strategy there just depends on your statistics - give up a thread, but reduce primary job collisions.
If the tasks are fast in general (less than hundreds of microseconds) you should try to split the tasks as few as possible. Then, if M is much greater than N, probably the best strategy is to assign M/N computations for each thread manually. (for i in 1:nthreads; for j in 1:M/N .... end; end)
If the tasks are in general slow (more than one second), the time for launching threads will not be important, and then you can just use a standard threaded loop on all computations or spawn them.
The corner case is if most of the threads are very fast in such a way that the time of launching a thread is important, but one or other computation can take much more time and be time-limiting. Then I would probably go with option 2, but if it will be effective or not depends on the details of the computations involved.
I would say the cut-off is even lower than this.
Don’t worry about grouping them if they in general take over 1ms. @spawn is fast.
It’s not so fast that careful things like ThreadPools (or @thread for) can’t do better for sure.
But it is fast enough that my cut-off for worrying about grouping them is <1ms.
Depending exactly how much you care about getting the last few bit of performance out you might start worrying earlier (or later).
using .Threads, BenchmarkTools
# a silly function taking some time, returning its thread
f(n) = (sum((i for i in 1:n) .^ 2); threadid())
function show_load(threads)
res = fill("", nthreads())
foreach(i->res[i]*="*", threads)
res
end
If we put the same load on all tasks, all threads are employed. Even with unbalanced load (if M >> N | M: number of tasks, N: nthreads) the balance is quite good:
But there is still an issue in the imbalance shown above: if there are more threads available at the time of task creation, the task should be given to a random thread (not to the first available one).
But I’m not able to make a PR for that since I don’t want to mess up enq_work(t::Task) (cited above by you).
Can I say thank-you to everybody who has contributed to this thread. It has been very informative, far beyond my basic question, which appears to be more or less definitively answered by a simple recommendation to use ThreadPools.jl, which I shall do.
For what it’s worth, I think @tkf’s Transducers ecosystem is the right way to provide a high level interface for solving this problem. Depending on the syntax and interfaces you prefer, one of these three packages may be very useful to you (they all do the same thing essentially under the hood):
These will do things like automatic load balancing and work very well for embarassingly parallel problems, as well as non-embarassingly parallel problems.
There’s of course some corner cases where the performance is not as good as desired, but this generally a very good place to start, and it’s flexible and modular enough that you can hook in and provide custom implementations of certain steps to increase efficiency. It’s improving all the time too.
Oh, I think this is a pretty good demo of the pathological case of the current parallel runtime. This happens because we don’t have task migration across worker threads. In that example, once thread 2 eats all the tasks, these tasks cannot be moved to other threads.
To be more specific, you can pass basesize to functions in Transducers.jl, ThreadsX.jl etc. for load-balancing. You’d need something smaller than M/N but how small depends on the typical distribution of the run-time of your computation.
With channels in the parallel runtime my use case to let a task read first from a channel is a natural consequence - even if it may be not so common. Task allocation to threads should be better distributed in the first place before having to migrate tasks between threads.
I wanted to say pathological scheduling; obviously not pathological usecase. It’d be great to support non-blocking functions well in the parallel runtime, of course.