OhMyThreads, ChunkSplitters, and Cost Estimates

I’ve run into a situation that I believe ought to be a “solved problem”, or at least have a canonical solution somewhere in the collective knowledge about OhMyThreads.jl and ChunkSplitters.jl.

I have a large finite-volume simulation on a Cartesian grid. The list of grid cells gets divided into evenly-sized partitions, and the update for each partition can be computed independently. The “hot loop” of the overall computation can essentially be described as:

  1. Compute the cell update and a time step size for each partition
  2. Choose the minimum time step size over all partitions
  3. Broadcast the necessary updates between partitions
  4. Apply the update to each partition

The caveat here is that some partitions contain “empty” grid cells – grid cells that don’t have a value, don’t need to be updated, and don’t contribute to the overall computational load. This, of course, means that the total work to update each partition is not uniform, but can be easily estimated.

If I use the standard scheduler in OhMyThreads, I end up with idle CPU cores during a tmap and tmapreduce.

Is there a scheduler in OhMyThreads that accepts a cost estimate? Is the GreedyScheduler appropriate here?

EDIT: Given that I’m not a computer scientist by training, I hadn’t heard of the many variants of the Partitioning Problem. I apologize for asking about the solution to something that is NP-Hard. My idle CPU cycles aren’t worth that much.

How many tasks do you generate? I think either scheduler’s work will be visible once you have more tasks than cores/threads. I remember there being some suggestions to the ratio, but you could try to go for something like 3 or 4:1 and see if the default scheduler handles it nicely. If not, try the other ones.

2 Likes

As @mkoculak remarked, a very easy approach is to oversubscribe threads: Generate k times as many tasks as you have threads.

A rough ballpark is that each task costs you 1us scheduler overhead. If your entire tmap thing is supposed to cost you 1s, then 1k tasks have negligible scheduler overhead – it’s wasted effort to argue whether you should spawn 16 or 32 tasks.

The really difficult thing is to write a generic library that does the right thing, without user intervention. But if you are an end-user with access to the problem you’re trying to solve, then you can simply ballpark the runtime and check whether you are in the happy space,

So that’s the ballpark numbers you should report: How many partitions do you have, what is your single-threaded runtime per tmap call, and how many cores does you machine have, each within a range of <100x. (e.g.: you have 10k-1M partitions, running a single-threaded step in 50-500 ms, planning to deploy on a 4-128 core machine → try to spawn 1k threads and eat the ~1ms scheduler waste).

It is important to be very concrete with these ballpark numbers, but not important to be precise (i.e. wide error bars are OK; but we cannot guess whether “a pretty expensive operation” means 100ns or one hour for you, and 10 zeros is a too wide error bar to give any meaningful advice).

PS. The law of large numbers suggests that this will not be a big issue: After chunk-splitting, you expect each chunk to have a runtime that is approximately normally distributed, with variance going like 1/sqrt(chunk_size). The two things that are really bad are: (1) fat tails (i.e. occasional partitions that are much more expensive than the rest – you shouldn’t have them in your problem set, right?) and (2) systematic biases in the ordering (all the first partitions in your iterator are the cheap ones, the later ones are more expensive) .

You can use a simple greedy algorithm to incorporate your cost estimates and get a somewhat more even split:

julia> import DataStructures

julia> function heapChunkSplit(items::Vector{Tuple{Float64, T}}, nChunks) where T
       sort!(items; by = item -> -item[1])
       h = DataStructures.BinaryHeap(Base.Order.By(t->t[1]), [(0.0, T[]) for i=1:nChunks])
       for (cost, item) in items
       acc, lst = pop!(h)
       acc += cost
       push!(lst, item)
       push!(h, (acc, lst))
       end
       sort(h.valtree; by=t->t[1])
       end

julia> heapChunkSplit([( rand()<0.1 ? rand() : 20*rand(), i) for i=1:1000], 10)
10-element Vector{Tuple{Float64, Vector{Int64}}}:
 (939.8248721181354, [887, 903, 461, 2, 724, 491, 669, 627, 1000, 33  …  986, 232, 79, 553, 573, 744, 297, 143, 225, 717])
 (939.8282508203936, [621, 153, 100, 713, 440, 785, 420, 280, 484, 166  …  201, 259, 446, 186, 853, 533, 12, 706, 121, 82])
 (939.8283297799661, [337, 849, 904, 412, 377, 175, 14, 878, 358, 315  …  423, 989, 463, 130, 709, 847, 939, 206, 603, 756])
 (939.8306416306181, [599, 432, 392, 73, 722, 763, 84, 563, 355, 289  …  918, 531, 7, 936, 479, 698, 642, 800, 459, 163])
 (939.8336030835728, [673, 946, 379, 465, 372, 346, 199, 902, 574, 328  …  634, 555, 241, 730, 250, 803, 769, 579, 720, 648])
 (939.8373148793895, [374, 714, 171, 95, 871, 229, 865, 844, 704, 303  …  678, 933, 979, 322, 133, 525, 538, 567, 811, 790])
 (939.8376868181022, [60, 738, 179, 568, 269, 792, 772, 964, 193, 914  …  109, 323, 45, 411, 751, 856, 427, 855, 448, 850])
 (939.8398825649863, [959, 76, 532, 382, 985, 125, 144, 654, 120, 863  …  436, 37, 789, 796, 778, 167, 739, 608, 899, 705])
 (939.8402390183963, [826, 732, 649, 74, 239, 858, 138, 107, 888, 969  …  736, 581, 832, 690, 653, 30, 214, 960, 32, 701])
 (939.8586064389611, [261, 340, 434, 966, 818, 200, 450, 828, 556, 774  …  652, 78, 917, 256, 152, 228, 204, 612, 53, 147])

I doubt how much that will help you, but that is the obvious algorithm (greedily put the costliest subproblem into the cheapest chunk until you have distributed all subproblem to chunks).

PS. code snippet above contained a bug, now fixed (greedily put the most expensive remaining workitem into the cheapest chunk, not the cheapest workitem :person_facepalming:).

3 Likes