Question on TaskLocalValue

Continuing the discussion from Inconsistent CPU utilisation in @threads loops:

I understand that trying to establish thread-based storage is an invitation to disaster, but on the other hand,

when there are many more tasks than nthreads() and you need significant storage per task, this can require a lot of storage, even though not all of the task local storage is being used simultaneously. Is there any way to provide storage that is limited to tasks that are either currently running or that are temporarily halted?

2 Likes

You can use a pool of buffers in a Channel and each task can take a bigger from the pool and release it when it is no longer needed.

3 Likes

Would have written the same as Valentin. I’ll add an example for this to the OhMyThreads.jl docs when I find the time for it.

2 Likes

That would be great! Even greater: add this functionality to your package?

PS: Thanks for OhMyThreads! It’s a tremendous package!

1 Like

FWIW, this is mentioned in the docs now.

2 Likes

Thanks for the example. Am I correct in assuming that channels are slower, which is why they are not always the best choice?

If spawning a task per work-unit is cheap enough, then using a channel to grab a buffer is cheap enough.

The really difficult part is:

When you have many cheap work-units (maybe ~200 ns) interspersed with quite expensive work-units (maybe ~ 200 ms) that possibly stall on IO, you don’t know which is which, both the expensive and cheap work-units take up significant fraction of the total time, and there are good reasons to expect bunching (expensive work-units being close together in your vector).

Spawning a new Task and using a channel both involve per-work-unit overheads and synchronization, and are therefore inappropriate for cheap work-units. This is solved by batching multiple work-units into the same Task.

But that is inappropriate for work-units that are heterogenous, expensive or stall on IO (you risk that some batches are much more expensive than others, and then some cores idle at the end). For these, Task+channel are better! (to deal with IO stalls, you can make the channel somewhat larger than the number of threads)

Afaiu there is no good julia library for that setting. One would need to do work-stealing with an overhead of roughly one uncontended atomic_cas! to mark progress on each unit (so this is inappropriate for settings where work-units are ~30 ns).

PS. To see the overhead of per-workunit tasks, consider

julia> using BenchmarkTools
julia> foo() = fetch(Threads.@spawn rand())
julia> @benchmark foo()
BenchmarkTools.Trial: 10000 samples with 137 evaluations.
 Range (min … max):  726.197 ns …  14.051 ΞΌs  β”Š GC (min … max): 0.00% … 93.82%
 Time  (median):     741.113 ns               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   775.904 ns Β± 458.332 ns  β”Š GC (mean Β± Οƒ):  2.17% Β±  3.49%

  β–ƒβ–‡β–ˆβ–‡β–„β–‚β–                    ▁▂▁▁▁                              ▁
  β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–†β–†β–‡β–†β–…β–†β–†β–…β–…β–‚β–…β–…β–…β–†β–„β–„β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–†β–†β–„β–„β–„β–ƒβ–‚β–„β–‚β–…β–ƒβ–„β–…β–…β–„β–„β–ƒβ–„β–…β–„β–…β–„β–…β–…β–†β–†β–† β–ˆ
  726 ns        Histogram: log(frequency) by time       1.06 ΞΌs <

 Memory estimate: 496 bytes, allocs estimate: 5.

So if your work-units are 100 us, then the scheduling overhead is negligible at < 1%. If a significant amount of total time is eaten by smaller work-units, then you need to think about such issues.

100x100 matmul is a big enough work-unit for a task:

julia> rs=rand(100, 100); a=copy(rs); b=copy(rs);
julia> @btime LinearAlgebra.mul!($rs, $a, $b);
  104.275 ΞΌs (0 allocations: 0 bytes)

20x20 matmul is not a big enough work-unit for a task:

julia> n=20;rs=rand(n, n); a=copy(rs); b=copy(rs);
julia> @btime LinearAlgebra.mul!($rs, $a, $b);
  636.893 ns (0 allocations: 0 bytes)
1 Like

[…]

I don’t really think proper workstealing is necessary for that.

E.g. here’s a function that meets your critera:

function f(p::Float64)
    if p ≀ rand()
        # fast path
        sum(rand(5, 5)^2)
    else
        # slow path
        sleep(0.2)
        sum(rand(5, 5)^2)
    end
end;

where p is the probability of hitting the slow path. Let’s run it on a lopsided distribution of ps:

julia> using OhMyThreads

julia> let ps = range(0.001, 0.01, length=5000)
           @btime foreach(f, $ps)
           @btime tforeach(f, $ps)

           # Oversubscribe with tasks, and iterate in random order to deal with the lopsided distribution
           @btime tforeach(f, $ps, scheduler=DynamicScheduler(nchunks=4*Threads.nthreads(), split=:scatter))
       end
  3.830 s (10077 allocations: 2.44 MiB)
  808.457 ms (10195 allocations: 2.45 MiB)
  604.304 ms (10257 allocations: 2.46 MiB)

However, if we want to get a bit more fancy, we can also use a Channel based approach, but instead of the channel holding individual workitems, we can make the channel hold chunks of work items, which also has comparable benefits:

julia> let ps = range(0.001, 0.01, length=5000)
           @btime tforeach(chunks($ps; n=200); scheduler=GreedyScheduler()) do inds
               foreach(f, view($ps, inds))
           end
       end
  606.219 ms (10137 allocations: 2.47 MiB)

This was run on a julia session with 6 threads, so we’re getting pretty close to theoretically perfect scaling here.

2 Likes

Yeah, but suppose your problem is more like

julia> import Distributions
julia> import LinearAlgebra
julia> LinearAlgebra.BLAS.set_num_threads(1)
julia> using OhMyThreads

julia> function waste_time(n)
       sum(rand(n,n)^2)
       end

julia> pp = 0.3; p = Distributions.Pareto(log(pp)/(log(pp)  - log(1-pp)));
julia> work_units = [min(4000, Int(ceil(rand(p))+1)) for i=1:1000_000];
julia> @time waste_time(4000)
  1.843917 seconds (4 allocations: 244.141 MiB, 0.31% gc time)
1.6001514641960087e10
julia> @time foreach(waste_time, work_units);
 23.251886 seconds (2.01 M allocations: 6.086 GiB, 0.30% gc time)
julia> @time tforeach(waste_time, work_units);
 17.724380 seconds (2.02 M allocations: 6.087 GiB, 0.22% gc time, 0.05% compilation time)

julia> @time tforeach(waste_time, work_units, scheduler=DynamicScheduler(nchunks=4*Threads.nthreads(), split=:scatter))
 18.006214 seconds (2.01 M allocations: 6.086 GiB, 0.18% gc time)
julia> Threads.nthreads()
16

I’m not entirely sure why the results are so abysmal, but power-law distributed runtimes for inputs are not far-fetched.

PS. The system is multithreading to some degree:

julia> work_units = [4000 for i=1:Threads.nthreads()];

julia> @time foreach(waste_time, work_units);
 30.144156 seconds (2.13 M allocations: 3.954 GiB, 0.22% gc time, 2.15% compilation time)

julia> @time tforeach(waste_time, work_units);
  9.234063 seconds (467.65 k allocations: 3.846 GiB, 0.18% gc time, 11.67% compilation time)

Adding the GreedyScheduler trick I showed above, here’s what I see on my 6 core machine with 6 threads:

julia> @time foreach(waste_time, work_units);
 25.933839 seconds (2.01 M allocations: 6.321 GiB, 1.44% gc time, 0.01% compilation time)

julia> @time tforeach(waste_time, work_units);
 17.283795 seconds (2.63 M allocations: 6.363 GiB, 0.83% gc time, 2.55% compilation time)

julia> @time tforeach(waste_time, work_units, scheduler=DynamicScheduler(nchunks=4*Threads.nthreads(), split=:scatter))
 16.796721 seconds (2.03 M allocations: 6.323 GiB, 0.26% gc time, 0.06% compilation time)

julia> @time tforeach(chunks(work_units; n=2_000); scheduler=GreedyScheduler()) do inds
           foreach(waste_time, view(work_units, inds))
       end
 13.069916 seconds (2.07 M allocations: 6.325 GiB, 0.42% gc time, 0.64% compilation time)

I don’t think this is very surprising though when we look at work_units:

julia> histogram(work_units)
                    β”Œ                                        ┐ 
   [   0.0,  200.0) β”€β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ  999 393  
   [ 200.0,  400.0) ─▏ 379                                     
   [ 400.0,  600.0) ─▏ 106                                     
   [ 600.0,  800.0) ─▏ 43                                      
   [ 800.0, 1000.0) ─▏ 16                                      
   [1000.0, 1200.0) ─▏ 17                                      
   [1200.0, 1400.0) ─▏ 9                                       
   [1400.0, 1600.0) ─▏ 8                                       
   [1600.0, 1800.0) ─▏ 9                                       
   [1800.0, 2000.0) ─  0                                       
   [2000.0, 2200.0) ─▏ 5                                       
   [2200.0, 2400.0) ─▏ 2                                       
   [2400.0, 2600.0) ─▏ 3                                       
   [2600.0, 2800.0) ─▏ 2                                       
   [2800.0, 3000.0) ─  0                                       
   [3000.0, 3200.0) ─  0                                       
   [3200.0, 3400.0) ─  0                                       
   [3400.0, 3600.0) ─▏ 1                                       
   [3600.0, 3800.0) ─  0                                       
   [3800.0, 4000.0) ─▏ 1                                       
   [4000.0, 4200.0) ─▏ 6                                       
                    β””                                        β”˜ 
                                     Frequency                 

The highly expensive work-units are tiny needles in a very big haystack (99.9393% of them are between 0 and 200, and 0.0006% of them are the very expensive 4000 entries.). Are there any multithreading systems out there that can get full thread utilization out of a case this pathological, even full on work stealing systems?

I think there’s just literally not enough objects in the long tail of the distribution to get efficient multithreading out of this unless we do something intelligent by trying to predict how long a given work unit will take to run, and then partitioning the work units based on that.

I do expect though that if you add more work units, you’ll get closer to full utilization, but since the distribution is so steep, it’ll take a long time to converge.

The per-item overhead for work-stealing is roughly equivalent to

julia> r=Ref(0); x = ReentrantLock();
julia> function foo(r, x)
       lock(x)
       try
           r[] += 1
       finally 
           unlock(x)
       end
       nothing
       end
julia> using BenchmarkTools

julia> @btime foo($r, $x)
  26.057 ns (0 allocations: 0 byte

In the above example, we can gladly pay 26ns per work-unit, but we would be somewhat reluctant to pay 1000 ns. So this is not impossible to handle, but it is not trivial.

But apart from admittedly annoying power-law distributions, one of the big reasons that handling this would be worthwhile is mental overhead:

Currently you need to use completely different strategies, depending on whether your work-units are smallish (<1000 ns, need to use batches) or heterogenous giants (e.g. parsing files in a directory, each unit gets its own task).

This is a steep ask for users! β€œbefore writing multi-threaded code, benchmark and have a feeling for work-unit sizes! In all APIs that you offer downstream users, you must expose both versions!”

And both inappropriate batching and inappropriate lack of batching are really really bad. So I’d be very happy if the β€œdynamic range” our our schedulers could improve.

1 Like