In multithreading, how to make each thread pick the first/smallest available element?

I start julia in Linux terminal with `julia -t 6`. The code

``````Threads.@threads   for i=1:100
sleep(3);     end
``````

on my 6-core CPU returns

``````18 2
52 1
35 4
69 6
85 3
1 5
70 6
53 1
19 2
86 3
36 4
2 5
87 3
37 4
20 2
71 6
54 1
3 5
...
``````

This is surprising to me, as I expected each thread to pick the first of the remaining elements in the collection. I expected the first 6 numbers on the left to be a permutation of 1:6, the next block a permutation of 7:12, the next block a permutation of 13:18, â€¦

For solving my problems, this shuffling makes parallelization much less efficient. Is there a way to force the threads to iterate in the natural order of the elements in the collection?

So to be clear, if some thread early on picks an element which requires the longest and finishes last, thatâ€™s fine. I just want each thread to pick the first/left-most available element from the collection.

Try `@threads :static for ...`

@carstenbauer Thank you, I tried it, but I still get arbitrary sequence of `i`â€™s on Julia 1.8.0.

`@threads :static` divides the range into packs `1:n/nthreads`, `n/nthreads+1:2n/nthreads` â€¦

For example with four threads and a range divisible by four you get

``````julia> A = Vector{Int64}(undef, 16);

end

julia> A
16-element Vector{Int64}:
1
1
1
1
2
2
2
2
3
3
3
3
4
4
4
4
``````

If you want `[1,5,9,13]` to be scheduled on e.g. thread 1, you would need to do your own task wrapping and scheduling.

@skleinbo Could you please give an example of how to parallellize my above code with `sleep` so that thread1 picks 1,5,9,13, as you mentioned?

First of all, let me make a general statement: Conceptually, Julia implements task-based parallelism (similar to Go). Hence, unless it is really necessary, you shouldnâ€™t think of threads at all but only tasks (that then get executed in parallel if possible). How they get distributed to threads is, from this conceptual point of few, largely an implementation detail. For example, the default scheduling for `@threads` has changed between Julia 1.7 and Julia 1.8 from `:static` to `:dynamic`. (Note that, unfortunately, `@threads` is somewhat of a misnomer in this context.)

The dynamic default scheduling has many advantages. For example, it enables composable multithreading (e.g. an `@threads` block inside of another `@threads` block) which you almost certainly will sacrifice when specifying a specific static task-thread mapping manually.

Having said that, there sometimes are reasons to specify a specific task-thread mapping. For example, to really optimise the performance of a code on a HPC cluster or similar.

In light of what I said above: Why, and are you sure?

That really shouldnâ€™t be the caseâ€¦ With `@threads :static` the sequence isnâ€™t arbitrary at all (donâ€™t get confused by the â€śshuffledâ€ť output order (which is just due to parallel printing).

``````julia> VERSION
v"1.8.4"

3

julia> x = [];

julia> Threads.@threads :static for i in 1:10
end

julia> x
10-element Vector{Any}:
1 => 1
2 => 1
3 => 1
4 => 1
5 => 2
6 => 2
7 => 2
8 => 3
9 => 3
10 => 3
``````

Note how the range is distributed among the 3 threads bot evenly and in order. (UPDATE: I only noted after posting this that this isnâ€™t perfectly even because 10 isnâ€™t divisible by 3 without remainder )

So - for some reason that is unclear to me - you seem to want `1 => 1, 2 => 2, 3 => 3, 4 => 1, 5 => 2, ...` ? AFAIK, there is no built-in option that gives you this. But you can, in principle, implement arbitrary task-thread mappings with more manual tools like `@tspawnat` from ThreadPinning.jl.

BTW, note that â€śpicks an elementâ€ť sounds a lot like load balancing (which, as of now, `@threads` doesnâ€™t give you for any scheduling option). For the given example (`sleep(3)`) this also doesnâ€™t really make sense since the workload is uniform, i.e. the same for all `i`.

5 Likes

The dynamic default scheduling has many advantages. For example, it enables composable multithreading (e.g. an @threads block inside of another @threads block) which you almost certainly will sacrifice when specifying a specific static task-thread mapping manually.

Iâ€™m just a beginner in parallelization, so I donâ€™t understand all that youâ€™ve said, but thank you for letting me know about these two options. Generally, it is much easier for me to learn through examples than by just reading documentation.

In light of what I said above: Why, and are you sure?

I am not sure, but in this problem of mine I implemented parallelization `accumulate2!`, where each thread picks a vertex in the graph, checks all its (left) neighbors, and processes those who have already been finished. My intent was that threads go tightly from left to right (if execution times are not too different), so each time, most of the left neighbors are already finished and are ready for processing.

So - for some reason that is unclear to me - you seem to want `1 => 1, 2 => 2, 3 => 3, 4 => 1, 5 => 2, ...` ?

No, I would just like the first 6=nthreads() numbers on the left to be a permutation of 1:6, the next 6 numbers on the left to be a permutation of 7:12, etc., as I mentioned. In short, Iâ€™d like each thread to pick the first available element.

I think you want a batch / basesize of `1`.
Juliaâ€™s threading normally uses a batch size of around `cld(length(x),Threads.nthreads())`.
I recommend using ThreadsX, which lets you set the base size.
In particular `ThreadsX.map` or `ThreadsX.foreach`. These accept a `basesize` option; I would set `basesize=1`. On a computer with 36 threads:

``````julia> tids = ThreadsX.map(_->Threads.threadid(), 1:10Threads.nthreads(), basesize=1);

julia> tids[1:10]
10-element Vector{Int64}:
1
22
19
27
30
11
13
4
18
5

julia> tids[1:36] |> unique |> length
18

julia> tids[37:72] |> unique |> length
19

julia> tids[1:36] |> unique |> length
24

julia> tids[37:72] |> unique |> length
25
``````

Compare this to `Threads.@threads`:

``````julia> Threads.@threads :static for i in 1:10Threads.nthreads()
end

julia> tids'
1Ă—360 adjoint(::Vector{Int64}) with eltype Int64:
1  1  1  1  1  1  1  1  1  1  2  2  2  2  2  2  â€¦  35  36  36  36  36  36  36  36  36  36  36

julia> tids == sort(tids)
true
``````

`ThreadsX` with a batch size of `1` isnâ€™t cycling, but it is running them 1 at a time and assigning new work as they complete it, which I think is exactly what you want. `ThreadsX.foreach` is for when you donâ€™t want a return value; you can replace `for` loops with `ThreadsX.foreach`, but often youâ€™re filling a vector in a loop, in which case `ThreadsX.map` is convenient, taking care of allocating and filling it for you.
There are a lot of other nice convenient functions in there like `ThreadsX.mapreduce` or `ThreadsX.findfirst` that are worth taking a look at.

Depending on your particular task, you can try different `basesize` values and benchmark to see how they do. Being to control that, and choose from a variety of convenience functions, make `ThreadsX` quite nice to use for threading.

3 Likes

Why does Julia default to a relatively large batch size?

It minimizes task creation overhead, and working on contiguous blocks tends to be way more efficient for anything thatâ€™s relatively fast. Try

``````julia> x = rand(8192);

julia> contig = @view(x[1:1024]); stride = @view(x[1:8:end]); length(contig) == length(stride)
true

julia> @btime sum(\$contig)
74.910 ns (0 allocations: 0 bytes)
514.3849724619506

julia> @btime sum(\$stride)
968.320 ns (0 allocations: 0 bytes)
512.9766792839781
``````

Consider that memory is transferred in cacheline-sized blocks. On this computer, 1 cacheline is 64 bytes, or `8`x Float64. (Most computers have 64 byte cachelines, but the M1 has 128 byte.)
That means if we have 8 threads, and submit one element at a time to each thread for them to read, youâ€™re actually submitting all the memory to each of them! That means, you end up using 8x the memory bandwidth!

It gets much worse if youâ€™re writing to the memory.
Because a cacheline is the finest granularity a CPU tracks, when one thread writes to a chunk of this size, that cacheline gets invalidated, and it thus has to basically pause every other thread trying to interact with that cacheline to synchronize, even if theyâ€™re writing/reading to different elements of that cacheline!
This is called false sharing, a problem avoided in most situations by breaking iterations up into batches, like Julia is doing, letting each thread hopefully work on iterations (mapping to memoryâ€¦) far apart in the iteration space.

But some workloads are different, very different, from this.

5 Likes

Apologies for the late reply. As you suggested, I tried out `ThreadsX`, but the code below immediately prints everything, there is no 3sec delay. I do not understand why.

``````ThreadsX.foreach(1:100, basesize=1)   do i
sleep(3);     end
``````

Also, it gave me really weird output on a 2-core CPU with 4 Julia threads::

``````2 4
3 2
96 2
4 2
54 2
37 2
93 2
7 2
85 2
35 2
70 2
28 2
17 2
57 2
8 2
15 2
46 2
42 2
66 2
64 2
44 2
36 2
77 2
88 2
52 2
1 3
11 2
32 2
63 2
14 2
39 2
``````

Why is there no 3sec delay? How can almost all be done with only thread 2?

When running your snippet

``````T=ThreadsX.map(_->(sleep(0.1);Threads.threadid()), 1:100, basesize=1);
println(T)
``````

I obtained a nicer looking result (threads seem to pick the first available index, except 4 is lazy):
`[1, 3, 2, 1, 2, 1, 3, 3, 4, 3, 2, 2, 2, 4, 2, 3, 4, 4, 2, 4, 3, 4, 3, 4, 3, 4, 4, 3, 3, 2, 4, 2, 2, 2, 2, 1, 2, 2, 2, 3, 4, 2, 4, 2, 2, 2, 4, 4, 2, 2, 2, 2, 4, 2, 3, 2, 3, 2, 4, 2, 3, 4, 3, 3, 3, 4, 3, 2, 2, 4, 4, 2, 3, 2, 2, 3, 4, 4, 4, 4, 3, 2, 3, 3, 3, 2, 2, 1, 3, 2, 3, 4, 4, 1, 2, 4, 2, 2, 4, 4]`
In any case, thank you for your help!

When a task sleeps, the thread running that task switches over to a different task.
No sense in just twiddling your thumbs while the task sleeps if there is still work left to be done!

As others have pointed out, Julia uses task-based parallelism. These tasks are the fundamental abstraction.
You can create one manually via `Threads.@spawn do_something()` (or even more manually by explicitly calling `Task(()->do_something())`, but then youâ€™d have to manually schedule it, tooâ€¦).

Work is divided up into tasks.
These tasks may be scheduled on different threads and run in parallel, but you donâ€™t really interact with the threads directly.

When you call `sleep(3)`, youâ€™re putting the task to sleep for 3 seconds.
Not the thread it was running on.
When the task goes to sleep, it yields/is no longer scheduled to execute, so the thread that was running the task looks for queued tasks it can pick up.
After 3 seconds (or however long the task slept), itâ€™ll get added back to the queue so a thread can pick it back up.

Basically, sleeping doesnâ€™t use your CPU, so the amount of sleeps you can execute in parallel isnâ€™t really limited by the number of cores you have.

`Threads.@threads :static` creates only 1 task per thread, hence, when a task goes to sleep, there are no unscheduled tasks to pick up in these examples.
â€śReal world codeâ€ť could of course have nested parallelism, so there could be more work to pick up in those cases.

4 Likes

Thank you for your help. Just one more question. carstenbauer compared Juliaâ€™s parallelism to that of Go.

How do parallelization capabilities of both languages compare (for optimizing mathematical algorithms on a single large CPU)? Does Go have more options for tweaking (like you did above with `:static` and `basesize`)? (I realize itâ€™s off-topic, but still, I thought someone might add a few comments on that.)

Iâ€™ve never used `go`, so I canâ€™t really comment there.
I would guess that both languages have a lot of options for tweaking w/ respect to batch sizes.
All you really need is the `@spawn` primitive for that, and then code that can live in either base or in packages that do the batching for you.
I have heard that `go` has a really good garbage collector. That will certainly help them when multithreading.
Itâ€™s possible to weak Juliaâ€™s GC a bit (look at `--heap-size-hint=` when checking `julia --help`), and itâ€™s probably possible to do so in `go` as well.

1 Like

The similarity between Julia and Go is that they both have tasks that are independent of threads and a scheduler that controls which tasks run on which thread. Go has nothing like `@threads`, so those parameters are not relevant. AFAIK, the only control of Goâ€™s scheduler from user code is Gosched, essentially `yield`. Edit, thereâ€™s also LockOSThread/UnlockOSThread to pin tasks to threads.

2 Likes