In multithreading, how to make each thread pick the first/smallest available element?

I start julia in Linux terminal with julia -t 6. The code

Threads.@threads   for i=1:100   
    println(i," ",Threads.threadid()); 
    sleep(3);     end

on my 6-core CPU returns

18 2
52 1
35 4
69 6
85 3
1 5
70 6
53 1
19 2
86 3
36 4
2 5
87 3
37 4
20 2
71 6
54 1
3 5
...

This is surprising to me, as I expected each thread to pick the first of the remaining elements in the collection. I expected the first 6 numbers on the left to be a permutation of 1:6, the next block a permutation of 7:12, the next block a permutation of 13:18, …

For solving my problems, this shuffling makes parallelization much less efficient. Is there a way to force the threads to iterate in the natural order of the elements in the collection?

So to be clear, if some thread early on picks an element which requires the longest and finishes last, that’s fine. I just want each thread to pick the first/left-most available element from the collection.

Try @threads :static for ...

@carstenbauer Thank you, I tried it, but I still get arbitrary sequence of i’s on Julia 1.8.0.

@threads :static divides the range into packs 1:n/nthreads, n/nthreads+1:2n/nthreads …

For example with four threads and a range divisible by four you get

julia> A = Vector{Int64}(undef, 16);

julia> Threads.@threads for i=1:16
           A[i] = Threads.threadid();
       end

julia> A
16-element Vector{Int64}:
 1
 1
 1
 1
 2
 2
 2
 2
 3
 3
 3
 3
 4
 4
 4
 4

If you want [1,5,9,13] to be scheduled on e.g. thread 1, you would need to do your own task wrapping and scheduling.

@skleinbo Could you please give an example of how to parallellize my above code with sleep so that thread1 picks 1,5,9,13, as you mentioned?

First of all, let me make a general statement: Conceptually, Julia implements task-based parallelism (similar to Go). Hence, unless it is really necessary, you shouldn’t think of threads at all but only tasks (that then get executed in parallel if possible). How they get distributed to threads is, from this conceptual point of few, largely an implementation detail. For example, the default scheduling for @threads has changed between Julia 1.7 and Julia 1.8 from :static to :dynamic. (Note that, unfortunately, @threads is somewhat of a misnomer in this context.)

The dynamic default scheduling has many advantages. For example, it enables composable multithreading (e.g. an @threads block inside of another @threads block) which you almost certainly will sacrifice when specifying a specific static task-thread mapping manually.

Having said that, there sometimes are reasons to specify a specific task-thread mapping. For example, to really optimise the performance of a code on a HPC cluster or similar.

In light of what I said above: Why, and are you sure?

That really shouldn’t be the case… With @threads :static the sequence isn’t arbitrary at all (don’t get confused by the “shuffled” output order (which is just due to parallel printing).

julia> VERSION
v"1.8.4"

julia> using Base.Threads

julia> Threads.nthreads()
3

julia> x = [];

julia> Threads.@threads :static for i in 1:10
           push!(x, i => Threads.threadid())
       end

julia> x
10-element Vector{Any}:
  1 => 1
  2 => 1
  3 => 1
  4 => 1
  5 => 2
  6 => 2
  7 => 2
  8 => 3
  9 => 3
 10 => 3

Note how the range is distributed among the 3 threads bot evenly and in order. (UPDATE: I only noted after posting this that this isn’t perfectly even because 10 isn’t divisible by 3 without remainder :slight_smile:)

So - for some reason that is unclear to me - you seem to want 1 => 1, 2 => 2, 3 => 3, 4 => 1, 5 => 2, ... ? AFAIK, there is no built-in option that gives you this. But you can, in principle, implement arbitrary task-thread mappings with more manual tools like @tspawnat from ThreadPinning.jl.

BTW, note that “picks an element” sounds a lot like load balancing (which, as of now, @threads doesn’t give you for any scheduling option). For the given example (sleep(3)) this also doesn’t really make sense since the workload is uniform, i.e. the same for all i.

5 Likes

The dynamic default scheduling has many advantages. For example, it enables composable multithreading (e.g. an @threads block inside of another @threads block) which you almost certainly will sacrifice when specifying a specific static task-thread mapping manually.

I’m just a beginner in parallelization, so I don’t understand all that you’ve said, but thank you for letting me know about these two options. Generally, it is much easier for me to learn through examples than by just reading documentation.

In light of what I said above: Why, and are you sure?

I am not sure, but in this problem of mine I implemented parallelization accumulate2!, where each thread picks a vertex in the graph, checks all its (left) neighbors, and processes those who have already been finished. My intent was that threads go tightly from left to right (if execution times are not too different), so each time, most of the left neighbors are already finished and are ready for processing.

So - for some reason that is unclear to me - you seem to want 1 => 1, 2 => 2, 3 => 3, 4 => 1, 5 => 2, ... ?

No, I would just like the first 6=nthreads() numbers on the left to be a permutation of 1:6, the next 6 numbers on the left to be a permutation of 7:12, etc., as I mentioned. In short, I’d like each thread to pick the first available element.

I think you want a batch / basesize of 1.
Julia’s threading normally uses a batch size of around cld(length(x),Threads.nthreads()).
I recommend using ThreadsX, which lets you set the base size.
In particular ThreadsX.map or ThreadsX.foreach. These accept a basesize option; I would set basesize=1. On a computer with 36 threads:

julia> tids = ThreadsX.map(_->Threads.threadid(), 1:10Threads.nthreads(), basesize=1);

julia> tids[1:10]
10-element Vector{Int64}:
  1
 22
 19
 27
 30
 11
 13
  4
 18
  5

julia> tids[1:36] |> unique |> length
18

julia> tids[37:72] |> unique |> length
19

julia> tids = ThreadsX.map(_->(sleep(1e-1);Threads.threadid()), 1:10Threads.nthreads(), basesize=1);

julia> tids[1:36] |> unique |> length
24

julia> tids[37:72] |> unique |> length
25

Compare this to Threads.@threads:

julia> Threads.@threads :static for i in 1:10Threads.nthreads()
           tids[i] = Threads.threadid()
       end

julia> tids'
1Ă—360 adjoint(::Vector{Int64}) with eltype Int64:
 1  1  1  1  1  1  1  1  1  1  2  2  2  2  2  2  …  35  36  36  36  36  36  36  36  36  36  36

julia> tids == sort(tids)
true

ThreadsX with a batch size of 1 isn’t cycling, but it is running them 1 at a time and assigning new work as they complete it, which I think is exactly what you want. ThreadsX.foreach is for when you don’t want a return value; you can replace for loops with ThreadsX.foreach, but often you’re filling a vector in a loop, in which case ThreadsX.map is convenient, taking care of allocating and filling it for you.
There are a lot of other nice convenient functions in there like ThreadsX.mapreduce or ThreadsX.findfirst that are worth taking a look at.

Depending on your particular task, you can try different basesize values and benchmark to see how they do. Being to control that, and choose from a variety of convenience functions, make ThreadsX quite nice to use for threading.

3 Likes

Why does Julia default to a relatively large batch size?

It minimizes task creation overhead, and working on contiguous blocks tends to be way more efficient for anything that’s relatively fast. Try

julia> x = rand(8192);

julia> contig = @view(x[1:1024]); stride = @view(x[1:8:end]); length(contig) == length(stride)
true

julia> @btime sum($contig)
  74.910 ns (0 allocations: 0 bytes)
514.3849724619506

julia> @btime sum($stride)
  968.320 ns (0 allocations: 0 bytes)
512.9766792839781

Consider that memory is transferred in cacheline-sized blocks. On this computer, 1 cacheline is 64 bytes, or 8x Float64. (Most computers have 64 byte cachelines, but the M1 has 128 byte.)
That means if we have 8 threads, and submit one element at a time to each thread for them to read, you’re actually submitting all the memory to each of them! That means, you end up using 8x the memory bandwidth!

It gets much worse if you’re writing to the memory.
Because a cacheline is the finest granularity a CPU tracks, when one thread writes to a chunk of this size, that cacheline gets invalidated, and it thus has to basically pause every other thread trying to interact with that cacheline to synchronize, even if they’re writing/reading to different elements of that cacheline!
This is called false sharing, a problem avoided in most situations by breaking iterations up into batches, like Julia is doing, letting each thread hopefully work on iterations (mapping to memory…) far apart in the iteration space.

But some workloads are different, very different, from this.

5 Likes

Apologies for the late reply. As you suggested, I tried out ThreadsX, but the code below immediately prints everything, there is no 3sec delay. I do not understand why.

ThreadsX.foreach(1:100, basesize=1)   do i
    println(i," ",Threads.threadid()); 
    sleep(3);     end

Also, it gave me really weird output on a 2-core CPU with 4 Julia threads::

2 4
3 2
96 2
4 2
54 2
37 2
93 2
7 2
85 2
35 2
70 2
28 2
17 2
57 2
8 2
15 2
46 2
42 2
66 2
64 2
44 2
36 2
77 2
88 2
52 2
1 3
11 2
32 2
63 2
14 2
39 2

Why is there no 3sec delay? How can almost all be done with only thread 2?

When running your snippet

T=ThreadsX.map(_->(sleep(0.1);Threads.threadid()), 1:100, basesize=1);
println(T)

I obtained a nicer looking result (threads seem to pick the first available index, except 4 is lazy):
[1, 3, 2, 1, 2, 1, 3, 3, 4, 3, 2, 2, 2, 4, 2, 3, 4, 4, 2, 4, 3, 4, 3, 4, 3, 4, 4, 3, 3, 2, 4, 2, 2, 2, 2, 1, 2, 2, 2, 3, 4, 2, 4, 2, 2, 2, 4, 4, 2, 2, 2, 2, 4, 2, 3, 2, 3, 2, 4, 2, 3, 4, 3, 3, 3, 4, 3, 2, 2, 4, 4, 2, 3, 2, 2, 3, 4, 4, 4, 4, 3, 2, 3, 3, 3, 2, 2, 1, 3, 2, 3, 4, 4, 1, 2, 4, 2, 2, 4, 4]
In any case, thank you for your help!

When a task sleeps, the thread running that task switches over to a different task.
No sense in just twiddling your thumbs while the task sleeps if there is still work left to be done!

As others have pointed out, Julia uses task-based parallelism. These tasks are the fundamental abstraction.
You can create one manually via Threads.@spawn do_something() (or even more manually by explicitly calling Task(()->do_something()), but then you’d have to manually schedule it, too…).

Work is divided up into tasks.
These tasks may be scheduled on different threads and run in parallel, but you don’t really interact with the threads directly.

When you call sleep(3), you’re putting the task to sleep for 3 seconds.
Not the thread it was running on.
When the task goes to sleep, it yields/is no longer scheduled to execute, so the thread that was running the task looks for queued tasks it can pick up.
After 3 seconds (or however long the task slept), it’ll get added back to the queue so a thread can pick it back up.

Basically, sleeping doesn’t use your CPU, so the amount of sleeps you can execute in parallel isn’t really limited by the number of cores you have.

Threads.@threads :static creates only 1 task per thread, hence, when a task goes to sleep, there are no unscheduled tasks to pick up in these examples.
“Real world code” could of course have nested parallelism, so there could be more work to pick up in those cases.

4 Likes

Thank you for your help. Just one more question. carstenbauer compared Julia’s parallelism to that of Go.

How do parallelization capabilities of both languages compare (for optimizing mathematical algorithms on a single large CPU)? Does Go have more options for tweaking (like you did above with :static and basesize)? (I realize it’s off-topic, but still, I thought someone might add a few comments on that.)

I’ve never used go, so I can’t really comment there.
I would guess that both languages have a lot of options for tweaking w/ respect to batch sizes.
All you really need is the @spawn primitive for that, and then code that can live in either base or in packages that do the batching for you.
I have heard that go has a really good garbage collector. That will certainly help them when multithreading.
It’s possible to weak Julia’s GC a bit (look at --heap-size-hint= when checking julia --help), and it’s probably possible to do so in go as well.

1 Like

The similarity between Julia and Go is that they both have tasks that are independent of threads and a scheduler that controls which tasks run on which thread. Go has nothing like @threads, so those parameters are not relevant. AFAIK, the only control of Go’s scheduler from user code is Gosched, essentially yield. Edit, there’s also LockOSThread/UnlockOSThread to pin tasks to threads.

2 Likes