How to Maximize CPU Utilization - @spawn Assigning to Busy Workers - Use pmap Instead

This is partially a performance question. I have a parallel process that is working. However, by using @spawn, all of the tasks are spawned to the available workers before any of the tasks start running. It doesn’t seem like @spawn cares about whether workers are busy. The tasks are of varying lengths, so some workers finish all of the tasks long before the other workers are done with all of their tasks. This means my CPU utilization goes from 100% to 90% to 80% and so one, slowly dropping CPU utilization.

Is there a way to have @spawn only assign tasks to workers if the worker is not busy with something else already? Then the CPU utilization would stay near 100% the entire time. Here’s a snippit of the code from the relevant section.

#Example Code

@sync for x in packedChunk
    @async fs[x.id] = @spawn run_chunks(x.id,x.project,x.scenario,x.sim_group
                                        ,x.ord_day,x.dt_file,x.ord_file
                                        ,x.driver_file,x.trailer_file
                                        ,x.shipwith_file,x.infeasible_file
                                        ,x.stopcon_file,x.storetrail_file)
end

@sync for i = eachindex(fs)
    a,b = fetch(fs[i])
    append!(toOutputFile, a)
    append!(toOrphansFile, b)
end

Currently when a task is spawned on a thread it says on that thread. This means that if run_chunks does any sort of IO the thread will go into a wait state and be free to start another task, but then both those tasks will be tied to that thread. This does mean you could end up with many long tasks running on the same thread.

What I would suggest first would be using @threads so something like:

@Threads.threads for i = 1:length(packedChunk)
    x = packedChunk[i]
    fs[x.id] = run_chunks(x.id,x.project,x.scenario,x.sim_group
              ,x.ord_day,x.dt_file,x.ord_file
              ,x.driver_file,x.trailer_file
              ,x.shipwith_file,x.infeasible_file
              ,x.stopcon_file,x.storetrail_file)
end

Which would equally distribute the tasks among all threads. There is still a chance that all the long tasks get assigned to the same thread.

Another possibility (I have no clue if it would work as you want) would be use a Semaphore and only spawn nthread()*x tasks at at time. You probably won’t get 100% CPU utilization but hopefully the tasks would be more evenly spread across the cores and overall you would have better utilization.
So something like:

import Base: Semaphore, acquire, release

sem = Semaphore(nthreads() * 4)

@sync for x in packedChunk
    acquire(sem)
    @Threads.spawn begin
        fs[x.id] = run_chunks(x.id,x.project,x.scenario,x.sim_group
                  ,x.ord_day,x.dt_file,x.ord_file
                  ,x.driver_file,x.trailer_file
                  ,x.shipwith_file,x.infeasible_file
                  ,x.stopcon_file,x.storetrail_file)
        release(sem)
    end
end

The trick would be finding the correct size of the Semaphore for you tasks.

Edit: I was curious how Julia scheduled tasks so I went poking around some more. You might want to look at GitHub - tro3/ThreadPools.jl: Improved thread management for background and nonuniform tasks in Julia. Docs at https://tro3.github.io/ThreadPools.jl it has macro for handling non uniform tasks. So that might be what you want.

1 Like

@Threads.threads
Unfortunately, I think I am doing a bunch of stuff that is not thread-safe. @Threads.threads just kind of instantly boots to the Julia prompt with no error message and it’s obvious none of the process is happening. No outputs are created as they would be with the process running correctly.

Semaphore + @Threads.spawn
I’ll have to look at this. I have never heard of Semaphore before. If my process isn’t thread-safe though, then maybe this will also fail. I haven’t tried it yet. I did try something similar where I ran only 12 tasks at a time (I have 12 cores) and then used @sync to wait until each group finished, but I face the same problem where some tasks on workers are delaying other workers from getting new tasks until they all complete. Again, Semaphore could be different and I haven’t tried @Threads.spawn before, soo…

ThreadPools.jl
This does seem to approach what I am trying to do. I don’t want to add any more packages to my project if possible. I am already using JuMP and Cbc. JuMP takes awhile to load, especially when you have 12 workers using @everywhere; however, I’ll take a look at the source code to try and understand what’s going on.

Was just asking about this other day here. I think the answer to your question is that instead of @spawn, use remotecall_fetch(default_worker_pool()) do ... end or even just pmap over your packedChunk. Both will only use free workers and keep them busy until work is exhausted.

1 Like

@spawn actually assigns tasks to threads that are not busy. Let’s say I have a function slow(n::Int) calibrated such that it keeps a core on my machine busy around 1 ms for each n:

using .Threads, BenchmarkTools

function slow(n)
    res = 0
    for _ in 1:n*2310
        res += sum(sin(1/rand()).^rand(1:5) for _ in 1:10)
    end
    return res
end

julia> @benchmark slow(1)
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     967.756 μs (0.00% GC)
  median time:      996.769 μs (0.00% GC)
  mean time:        1.026 ms (0.00% GC)
  maximum time:     2.520 ms (0.00% GC)
  --------------
  samples:          4872
  evals/sample:     1

julia> @benchmark slow(1000)
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     1.022 s (0.00% GC)
  median time:      1.028 s (0.00% GC)
  mean time:        1.034 s (0.00% GC)
  maximum time:     1.058 s (0.00% GC)
  --------------
  samples:          5
  evals/sample:     1

Then I can check sequential vs parallel execution:

julia> @time foreach(_->slow(1_000), 1:nthreads())
  9.477004 seconds (104.81 k allocations: 5.884 MiB)

julia> @time @sync foreach(_->(Threads.@spawn slow(1_000)), 1:nthreads())
  1.385115 seconds (117.54 k allocations: 6.567 MiB)

julia> @time @sync @threads for _ in 1:nthreads()
           slow(1_000)
       end
  1.376207 seconds (118.42 k allocations: 6.578 MiB)

You see that in a case where all tasks take equally long there is no difference between @spawn and @threads. You can check also with htop that all CPUs are employed.

Let’s check dynamic scheduling. If I randomize the n argument to slow uniformly between 1:1000, a task should take on average 0.5 seconds. Such if I spawn 160 such randomized tasks on 8 cores it should take roughly 10 seconds if all cores are employed:

julia> @time @sync for _ in 1:160
           Threads.@spawn slow(rand(1:1000))
       end
 13.303092 seconds (19.02 k allocations: 1.198 MiB)

htop shows that all 8 cores are employed equally well:

Let’s check with @threads:

julia> @time @sync @threads for _ in 1:160
           slow(rand(1:1000))
       end
 14.933920 seconds (35.20 k allocations: 1.976 MiB)

This takes a bit longer since at the end of the computation the CPU load gets more unbalanced:

Now let’s check with Distributed pmap as suggested by @marius311 :

julia> using Distributed

julia> addprocs();

julia> nprocs()
17

julia> @everywhere function slow(n)
           res = 0
           for _ in 1:n*2310
               res += sum(sin(1/rand()).^rand(1:5) for _ in 1:10)
           end
           return res
       end

julia> @time pmap(_->slow(rand(1:1000)), 1:160);
 10.296406 seconds (236.70 k allocations: 12.383 MiB, 0.09% gc time)

OK, now that beats the above two with htop showing also the hyper-threads employed:

Two notes of caution

  1. You speak of @spawn and workers, @everywhere and Threads intermingled. Please note that those are actually two different concepts of parallel computing in Julia. Better to not mix them together.
  2. There are pathological cases where the scheduling of tasks to threads with @spawn does not work properly.
4 Likes

Follow-up to my last analysis! If I start Julia with JULIA_NUM_THREADS=16 and repeat, I get with @spawn the same results as with Distributed pmap:

julia> @time @sync foreach(_->(Threads.@spawn slow(rand(1:1_000))), 1:160)
 10.524744 seconds (35.11 k allocations: 2.049 MiB)

with htop showing all threads and hyper-threads employed:

I conclude from this that I should start Julia with JULIA_NUM_THREADS equal to the number of threads and hyper-threads! Any objections?

julia> versioninfo()
Julia Version 1.5.3
Commit 788b2c77c1 (2020-11-09 13:37 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin18.7.0)
  CPU: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 16
2 Likes

pmap gave me the right direction. It’s running 13% faster already. Thank you @marius311. @pbayer gives a lot more detail that helped fill in the gaps in my understanding, so I’m marking his post as the solution.

1 Like

@pbayer thank you for calling attention to the fact that I was mixing parallel computing terms which could cause confusion for others reading this thread in the future. The truth is, I didn’t know whether to use @Threads or @spawn or pmap or asynchronous task/job scheduling. It looks like pmap is the winner here, at least for my use case with JuMP and Cbc packages.

pmap is solving ~13% faster on a test dataset. For whatever reason, @sync-for-@async-@spawn was not balancing the work well between workers (see original post for the structure I was using). pmap is balancing the work by simply using:

fs = pmap(x->run_chunks(x.id,x.project,x.scenario,x.sim_group
                        ,x.ord_day,x.dt_file,x.ord_file
                        ,x.driver_file,x.trailer_file
                        ,x.shipwith_file,x.infeasible_file
                        ,x.stopcon_file,x.storetrail_file),packedChunk)

@sync for i = eachindex(fs)
    a,b = fetch(fs[i])
    append!(toOutputFile, a)
    append!(toOrphansFile, b)
end

I do not know why I am seeing a difference in how each approach functions (pmap vs @spawn). pmap is definitely keeping all of the cores busy, so it does look to my eyes like it is assigning work only to available workers. @spawn was pre-allocating the tasks to the workers which was causing a delay at the end with 1 or 2 workers getting backed up with longer tasks.

I always wanted to understand how to use pmap, but I had never gotten it to work. I gave up too quickly. I had settled on the @sync-for-@async-@spawn structure because it was the first thing I was able to get up and running.

Thank you @pbayer
Thank you @marius311
Thank you @pixel27

1 Like

Note there’s also a threads version pmap in ThreadTools.tmap which should do the same load-balancing but with threads instead of processes. This will be less overhead since it doesn’t require copying the memory to other processes, and you don’t have to pay the overhead of launching the other processes either, but if you want workers across multiple machines then you’ll need pmap.

4 Likes

Thanks @marius311, and yes, my intent is to use distributed computing to take advantage of small to medium sized clusters. Or a multi-computer network of some kind. I am loving Julia for that reason: the flexibility of scalability.

that should not be the case on a single machine. I couldn’t believe and therefore took measurements. I can confirm your results with my approach described above with all threads used and with identical random numbers t.

With Base.Threads:

julia> times = Float64[];

julia> for i in 1:10  # ten runs
           push!(times, (@elapsed @sync foreach(n->(Threads.@spawn slow(n)),t)))
           sleep(30)  # 30 s silicon cool-down between each run 
       end

julia> (μ = mean(times), σ = std(times))  # in seconds
(μ = 11.976195435700001, σ = 0.10846830416250676)

With Distributed:

julia> times = Float64[];

julia> for i in 1:10  # ten runs
           push!(times, (@elapsed pmap(n->slow(n), t)))
           sleep(30)  # 30 s silicon cool-down between each run 
       end

julia> (μ = mean(times), σ = std(times))  # in seconds
(μ = 10.4988074009, σ = 0.0983951465164567)

The time with pmap is very near to the theoretical time/an optimal schedule:

Computing times t setup
julia> using Random

julia> Random.seed!(123);

julia> t = rand(1:1000, 160);

julia> sum(t)/8000   # the computation should roughly take this time in seconds on 8 cores
10.26175

A note about benchmarking:

Since I am on a laptop (MacBook Pro), I needed cool-downs between runs to get more realistic results. Likewise to test scheduling I choose much longer computations than usual (> 10 s on all cores). Therefore I went with @elapsed and not with BenchmarkTools.

3 Likes

The reason may be that garbage collection stops the “world”, which is all threads in the process. The more threads per process, the more overhead. I have seen up to 4x faster distributed than multithreaded run of the same (albeit edge-case) benchmark on a 64 core machine.

1 Like

Benchmarks show no garbage collection taking place (may that be hidden?).

julia> @benchmark (@sync for n in t      | julia> @benchmark pmap(n->slow(n), t) seconds=100
           Threads.@spawn slow(n)        | 
       end) seconds=100                  | 
BenchmarkTools.Trial:                    | BenchmarkTools.Trial:
  memory estimate:  123.39 KiB           |   memory estimate:  477.23 KiB
  allocs estimate:  1378                 |   allocs estimate:  12356
  --------------                         |   --------------
  minimum time:     13.207 s (0.00% GC)  |   minimum time:     11.686 s (0.00% GC) 
  median time:      13.588 s (0.00% GC)  |   median time:      12.136 s (0.00% GC)
  mean time:        13.535 s (0.00% GC)  |   mean time:        12.075 s (0.00% GC) 
  maximum time:     13.726 s (0.00% GC)  |   maximum time:     12.427 s (0.00% GC)
  --------------                         |   --------------
  samples:          8                    |   samples:          9
  evals/sample:     1                    |   evals/sample:     1

The times here are not realistic due to thermal throttling but still show the difference.

3 Likes

Related GH issue: [feature request] Worker scheduler based on current load · Issue #41220 · JuliaLang/julia · GitHub. Indeed Distributed.@spawnat cycles through workers, instead of choosing the next available worker. This can remove all benefits of parallelism if your # of workers is an integer ratio with the number of jobs submitted (in a loop), and one of those jobs is significantly longer than others, since they will all stack up on a single worker.

Presumably you are using @spawnat :any since you want the scheduler to pick a worker for you.

Have you looked at the WorkerPool variants of the remotecall family of functions?

remotecall(f, pool::AbstractWorkerPool, args…; kwargs…) → Future

WorkerPool variant of remotecall(f, pid, …). Wait for and take a free worker from pool and perform a remotecall on it.

1 Like

Thanks @greg_plowman! So, just so I understand, I would pass the list procs containing my worker IDs into a WorkerPool, and then use that WorkerPool and remotecall instead of calling @spawnat :any? So

for i=1:100
    push!(futures, @spawnat :any f(x[i]))
end

becomes

pool = WorkerPool(procs)
for i=1:100
    push!(futures, remotecall(f, pool, x[i])
end

and this would be enough to have it more efficiently schedule work?
(excuse the inefficiency of this call; I need this structure for other reasons).

Btw, do you know if there is there any macro like @spawnat but for a workerpool? (for more complex expressions)

Roughly yes.

Note that procs() contains the master process, whereas workers() does not.

Hard to say more without knowing your particular application/requirements.

If you are already working with Futures then your approach seems equivalent.
Alternatively, each iteration could call remotecall_fetch wrapped in an @async task, and wait for all to complete.

Not that I’m aware of.
But couldn’t any complexity be wrapped into a function (perhaps an anonymous closure)?

If you wanted to get more fine grained with this, you could use RemoteChannels and roll your own scheduler