Looking for code to solve (surely common) 'embarrassing parallelism' multithreading use-case

I am looking for code that can solve the following (multi-core, shared
memory) parallelisation problem, which is surely so common that a well
implemented solution must exist out there.

I have N cores.
I have a list (vector) of length M of procedure calls.

  • each call is thread safe with respect to all the others

  • the time required to complete a procedure call is hugely variable
    (a very rough idea of how long a call might take is available, if
    useful - at the level of ‘big’, ‘medium’, ‘small’, but no more than that).

  • M >> N

I would like my N cores to work through the M procedure calls as
efficiently as possible.

I suspect that the most efficient way to do this is with N persistent
threads (to avoid the overhead of starting a new thread for each
procedure call) but I am quite possibly wrong, and the costs of
starting a new thread for each procedure call are essentially immaterial.

Either way, the code should work (‘thread’) through all the calls as
fast as possible, using all cores.

Anyway, I assume this is a standard problem, so there should be
standard code out there. Anybody got good suggestions?

Many thanks,

Sean Matthews
seanmatthews1@gmail.com

2 Likes

Have you tried the naive approach, i.e. starting Julia with -t N and then Threads.@spawn ing M tasks? (Threads.@spawn does load balancing.)

For more fine grained control you can take a look at ThreadPools.jl.

(You might also consider using pmap which, however, uses different worker processes instead of threads. So you need to use @everywhere and such to distribute your code etc.)

5 Likes

Julia pools threads behind the scenes.
So you don’t have to worry about it.

All threads start at the start of the Julia processes.
Threads.@spawn just schedules the work onto a queue which all the threads can access to get more work from when they finish their current task.

3 Likes

isn’t it quite the contrary? @spawn tells the scheduler to start a task on a thread that happens to be free?

From the manual:

Create and run a Task on any available thread.

Sometimes I wish it would be the way you described.

1 Like

I am honestly not 100% sure.

Looks like it just wrapps schedule

julia> Base.remove_linenums!(@macroexpand Threads.@spawn sin(1))
quote
    let
        local var"#28#task" = Base.Threads.Task((()->begin
                            sin(1)
                        end))
        (var"#28#task").sticky = false
        if $(Expr(:islocal, Symbol("##sync#41")))
            Base.Threads.put!(var"##sync#41", var"#28#task")
        end
        Base.Threads.schedule(var"#28#task")
        var"#28#task"
    end
end

Note that Threads.schedule == schedule

The docs for schedule says it adds the the scheduler’s queue.

help?> Threads.schedule
  schedule(t::Task, [val]; error=false)

  Add a Task to the scheduler's queue. This causes the task to run constantly when the
  system is otherwise idle, unless the task performs a blocking operation such as wait.

Which sounds like it adds it to a queue.

The source code for schedule looks like it is queuing up work

2 Likes

@Sean_Matthews - this use case is exactly why I created ThreadPools.jl. The queued scheduler in that package is aimed directly at jobs of wildly varying lengths. (I had the same problem.) You may also want to consider keeping the primary thread available for scheduling. If a big job hits the primary, no new ones can start. The best strategy there just depends on your statistics - give up a thread, but reduce primary job collisions.

5 Likes

My two cents:

  1. If the tasks are fast in general (less than hundreds of microseconds) you should try to split the tasks as few as possible. Then, if M is much greater than N, probably the best strategy is to assign M/N computations for each thread manually. (for i in 1:nthreads; for j in 1:M/N .... end; end)
  2. If the tasks are in general slow (more than one second), the time for launching threads will not be important, and then you can just use a standard threaded loop on all computations or spawn them.

The corner case is if most of the threads are very fast in such a way that the time of launching a thread is important, but one or other computation can take much more time and be time-limiting. Then I would probably go with option 2, but if it will be effective or not depends on the details of the computations involved.

5 Likes

I’ll echo this. If they are wildly varying but super-short, group them together before parallelizing.

4 Likes

I would say the cut-off is even lower than this.
Don’t worry about grouping them if they in general take over 1ms.
@spawn is fast.
It’s not so fast that careful things like ThreadPools (or @thread for) can’t do better for sure.
But it is fast enough that my cut-off for worrying about grouping them is <1ms.
Depending exactly how much you care about getting the last few bit of performance out you might start worrying earlier (or later).

julia> @btime Threads.@spawn 1;
  140.650 ns (4 allocations: 352 bytes)

julia> @btime fetch(Threads.@spawn 1);
  13.604 μs (4 allocations: 352 bytes)
5 Likes

yes, I see you are right!

using .Threads, BenchmarkTools

# a silly function taking some time, returning its thread
f(n) = (sum((i for i in 1:n) .^ 2); threadid())

function show_load(threads)
    res = fill("", nthreads())
    foreach(i->res[i]*="*", threads)
    res
end

then

julia> @btime f(2_000)
  1.174 μs (2 allocations: 31.50 KiB)
1

julia> fetch.(map((_->Threads.@spawn f(2000)), 1:nthreads()))
8-element Array{Int64,1}:
 3
 2
 4
 5
 6
 7
 8
 1

If we put the same load on all tasks, all threads are employed. Even with unbalanced load (if M >> N | M: number of tasks, N: nthreads) the balance is quite good:

julia> show_load(fetch.(map(_->(Threads.@spawn f(rand(1:2000))), 1:500)))
8-element Array{String,1}:
 "****************************************************"
 "****************************************************************************************"
 "**************************************************************************"
 "*********************************"
 "********************************************************"
 "***********************************************************"
 "***********************************************************************"
 "*******************************************************************"

I first had an other impression because in my applications/tasks usually I read first from a channel. In that case the load is very imbalanced:

g(n) = (yield(); f(n))

julia> show_load(fetch.(map(_->(Threads.@spawn g(rand(1:2000))), 1:500)))
8-element Array{String,1}:
 "*"
 "*************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************"
 "*"
 "*"
 "*"
 "*"
 "*"
 "*"
3 Likes

Can I interest you in opening a PR to update the docs on this and make it more clear?

Hope that makes it clearer:

Create a Task and schedule it to run on any available thread. The task is allocated to a thread after it becomes available.

see: Update threadingconstructs.jl by pbayer · Pull Request #39309 · JuliaLang/julia · GitHub

But there is still an issue in the imbalance shown above: if there are more threads available at the time of task creation, the task should be given to a random thread (not to the first available one).

But I’m not able to make a PR for that since I don’t want to mess up enq_work(t::Task) (cited above by you).

1 Like

Can I say thank-you to everybody who has contributed to this thread. It has been very informative, far beyond my basic question, which appears to be more or less definitively answered by a simple recommendation to use ThreadPools.jl, which I shall do.

Sean

4 Likes

For what it’s worth, I think @tkf’s Transducers ecosystem is the right way to provide a high level interface for solving this problem. Depending on the syntax and interfaces you prefer, one of these three packages may be very useful to you (they all do the same thing essentially under the hood):

These will do things like automatic load balancing and work very well for embarassingly parallel problems, as well as non-embarassingly parallel problems.

There’s of course some corner cases where the performance is not as good as desired, but this generally a very good place to start, and it’s flexible and modular enough that you can hook in and provide custom implementations of certain steps to increase efficiency. It’s improving all the time too.

5 Likes

Oh, I think this is a pretty good demo of the pathological case of the current parallel runtime. This happens because we don’t have task migration across worker threads. In that example, once thread 2 eats all the tasks, these tasks cannot be moved to other threads.

To be more specific, you can pass basesize to functions in Transducers.jl, ThreadsX.jl etc. for load-balancing. You’d need something smaller than M/N but how small depends on the typical distribution of the run-time of your computation.

3 Likes

With channels in the parallel runtime my use case to let a task read first from a channel is a natural consequence - even if it may be not so common. Task allocation to threads should be better distributed in the first place before having to migrate tasks between threads.

I wanted to say pathological scheduling; obviously not pathological usecase. It’d be great to support non-blocking functions well in the parallel runtime, of course.

3 Likes