# More threads, slower code, even if not spawning them

Maybe I don’t understand how `@spawn` works, but here is what I am observing: When executing a parallel task with it, I observe that the performance is significantly worse if I have more threads available (i. e. starting julia with `-tX` with greater `X`) even if I am not spawning more threads.

An example code is bellow, but essentially what I see is:

• Start julia with `julia -t32` and execute the code spawning `32` threads: 216 μs
• Start julia with `julia -t64` and execute the code spawning `32` threads: 3.323 ms

How to understand this? The result of that is that in a parallel code, even if I know that it is not worth to spawn more threads because of the limited scalability, the performance gets worse just for the fact that more threads are available.

(FWIW, this is in a computer with 128 real cores)

Test code:

``````using Base.Threads: @spawn
using LinearAlgebra: norm
using StaticArrays
using Test

# Parallel code for sum(norm.( x - y for x in x, y in y ))
function sumd(x,y,nbatches;aux=zeros(nbatches))
@assert length(x)%nbatches == 0
batchsize = length(x) ÷ nbatches
aux .= 0.
@sync for ibatch in 1:nbatches
ifirst = (ibatch-1)*batchsize + 1
ilast = ibatch*batchsize
@spawn begin
for i in ifirst:ilast
for j in 1:length(y)
@inbounds aux[ibatch] += norm(y[j]-x[i])
end
end
end
end
return sum(aux)
end

function test(nbatches)
x = [ rand(SVector{3,Float64}) for _ in 1:6400 ]
y = [ rand(SVector{3,Float64}) for _ in 1:10 ]
@test sumd(x,y,nbatches) ≈ sum(norm.( x - y for x in x, y in y ))
aux = zeros(nbatches)
@btime sumd(\$x,\$y,\$nbatches;aux=\$aux)
end

``````

Here is the step-by-step:

## Start Julia with `32` threads, and spawn 32 tasks:

``````[lmartine@adano58 old]\$ JULIA_EXCLUSIVE=1 julia -t32
_
_       _ _(_)_     |  Documentation: https://docs.julialang.org
(_)     | (_) (_)    |
_ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
| | | | | | |/ _` |  |
| | |_| | | | (_| |  |  Version 1.7.0 (2021-11-30)
_/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
216.054 μs (205 allocations: 17.83 KiB)
42508.465422704496
``````

``````[lmartine@adano58 old]\$ JULIA_EXCLUSIVE=1 julia -t64
_
_       _ _(_)_     |  Documentation: https://docs.julialang.org
(_)     | (_) (_)    |
_ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
| | | | | | |/ _` |  |
| | |_| | | | (_| |  |  Version 1.7.0 (2021-11-30)
_/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
3.323 ms (205 allocations: 17.83 KiB)
40862.74149814549

``````
1 Like

Is your system hyper threaded? what are the number of physical cores?

I am no expert, but I have sometimes found more threads than physical cores to be problematic (over subscription); especially as `JULIA_EXCLUSIVE` is trying to set affinities to cores, performance will depend on how exactly that is done.

1 Like

Yeah, I updated that. 128 physical cores, not hyper-threaded.

Is this 2 64 core cpus? If so, part of the problem might be numa.

3 Likes

It seems:

`[ 0.077000] smpboot: CPU0: AMD EPYC 7662 64-Core Processor (family: 0x17, model: 0x31, stepping: 0x0)`

But, apparently, it is hyper-threaded (though the cluster administrator does not allow running more than 128 tasks). Is it easy to avoid using the virtual cores?

edit: I will play with GitHub - carstenbauer/ThreadPinning.jl: Pinning Julia threads at runtime and see what happens.

I have resorted to pinning threads/cores by hand (using Hwloc) on Linux before, not easy to do on mac for example. As expected `ThreadPinning.jl` only works on linux.

1 Like

Thanks @Oscar_Smith and @raminammour for the hints. It is linux.

It seem that it has something to do with that, indeed.

Using the `ThreadPinning` package I can obtain better results, although they still fluctuate a lot. The best strategy seems to be `pinthreads(:scatter)` here, although from the output I would think that `:halfcompact` would be picking the real cores). @carstenbauer: do the thread pinning takes into to consideration the use of the cores (by the system or others?).

No. The user needs to explicitly specify a pinning strategy, as you did with `pinthreads(:scatter)`. While I can in principle imagine a `pinthreads(:unused)` option, I mostly wrote ThreadPinning.jl for cluster usage in which case one typically has an empty compute node anyways.

1 Like

Note that macOS simply doesn’t support pinning of processes or threads. So it is not just “not easy to do” but in fact impossible. Thus, there is nothing that ThreadPinning.jl can do about it.

2 Likes

I thought I had found something OSX about them supporting thread affinity when I looked, but that looks old and experimental, and nothing for process binding.

Yeah, in my particular case I have to benchmark the code in a shared computer, so it is not really empty.

But the fluctuations I see I don’t think I can attribute only to the competing resources.

Your package is really nice, thanks.

It’s not relevant to the question, but I’d imagine `aux[ibatch] +=` may be causing false sharing (unless LLVM does something clever by generating two versions of the loop or TBAA kicks in)

1 Like

side question: how can I set the `basesize` option in `@floop` to see how it behaves on that machine changing the “effective” number of threads. I could not find an example in the docs. This is what I tried:

``````function sumd_floop(x,y,nbatches)
@floop basesize = div(length(x),nbatches) for i in 1:length(x)
for j in 1:length(y)
d = norm(y[j]-x[i])
@reduce( dsum = 0. + d )
end
end
return dsum
end
``````

You can use `@floop ThreadedEx(basesize = div(length(x),nbatches)) for`.

Avoiding false sharing in your code is also easy:

``````        @spawn begin
acc = zero(eltype(aux))
for i in ifirst:ilast
for j in 1:length(y)
acc += @inbounds norm(y[j]-x[i])
end
end
aux[ibatch] = acc
end
``````
2 Likes

I’ve added a `@threads` and a `@floop` example here. Both are much faster than the `@spawn` option (no idea why for the `@threads` vs. `@spawn`). `@floop` performs particularly well.

At the same time, all the options suffer from the “more threads available, less performance”, even if these more threads are not being used.

Last version of the code here: CellListMapBenchmarks.jl/test.jl at main · m3g/CellListMapBenchmarks.jl · GitHub

Result:

``````[lmartine@adano58 old]\$ JULIA_EXCLUSIVE=1 julia -t32
...
julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
sumd threads:   128.640 μs (161 allocations: 15.33 KiB)
sumd spawn:     1.226 ms (237 allocations: 18.33 KiB)
floops:         94.447 μs (283 allocations: 18.00 KiB)

...
julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
sumd threads:   212.327 μs (321 allocations: 30.59 KiB)
sumd spawn:     3.567 ms (237 allocations: 18.33 KiB)
floops:         123.501 μs (283 allocations: 18.00 KiB)

``````

The terrible performance of `@spawn` here is somewhat associated with the thread pinning, as:

``````julia> pinthreads(:halfcompact)

julia> test(32)
sumd threads:   248.985 μs (321 allocations: 30.59 KiB)
sumd spawn:     4.704 ms (237 allocations: 18.33 KiB)
floops:         187.130 μs (283 allocations: 18.00 KiB)

julia> test(32)
sumd threads:   397.252 μs (321 allocations: 30.59 KiB)
sumd spawn:     532.616 μs (237 allocations: 18.33 KiB)
floops:         174.627 μs (286 allocations: 18.09 KiB)

julia> test(32)
sumd threads:   208.800 μs (321 allocations: 30.59 KiB)
sumd spawn:     3.610 ms (237 allocations: 18.33 KiB)
floops:         116.478 μs (283 allocations: 18.00 KiB)

``````

On my real application I don’t see such a large gap relative to using threads, though (and again I am tempted to give floops another try there, I always struggle with the fact that I need to define custom reduction functions and I don’t understand how that can be done safely).

Hmm… Interesting that the spawn version doesn’t perform so well. I looks like a reasonable implementation to me.

I’m always looking for examples to put in the documentation. So let me know if you have some MWE (using `@threads` or `@spawn`) representing the pattern you need.

The problem I have is that the user can provide variables types of data, and the function to be executed in parallel is provided as a closure. Something like this, which have the following properties:

1. The user can provide a function `f` that will be computed in parallel for each element of the input data `x`.

2. The result of `f` can be mutable or immutable, in fact it can be anything.

3. Thus, the reduction function cannot be generalized. There is a default strategy (exemplified below), but if the user function and data is particular, he/she has to implement and provide a custom function.

``````function my_code(f, x, result;
)
end
end
return result
end
``````

A typical use case could be that the ser is updating an histogram. In this case, for example, one could have:

``````function add_to_histogram!(x,hist)
bin = floor(x/0.1) + 1
hist[bin] += 1
return hist
end

end
return hist
end
``````

And then the user could call the main function with:

``````hist = zeros(Int,10)
my_code(
x, hist,
reduce = reduce_histogram
)
``````

I don’t know if the idea is clear, but the point is that the user can pass any type of data, using for instance a closure, and have to define a custom reduction function. It may also want to preallocate the threaded output (the `results_threaded` array), because this can be executed many times inside an outer hot loop.

Thus there is a lot of flexibility there, which I was not able to understand if it can be mapped into a more specialized and better parallel strategy.

Anyway, any insight is very welcome.

Technically, what you described is what parallel `reduce` does:

``````using MicroCollections: OneHotVector
using InitialValues: InitialValue

compute_bin(x) = min(10, floor(Int, x / 0.1) + 1)
f(x) = OneHotVector(compute_bin(x) => 1, 10)
op!!(h1, h2) = broadcast_inplace!!(+, h1, h2)  # similar to `h1 .+= h2`

using Folds
Folds.mapreduce(f, op!!, rand(1000); init = InitialValue(+))
``````

I’ve also discussed how to do hisogram this way elsewhere too:

The main observation is that `add_to_histogram!` and `(hist1, hist2) -> reduce_histogram!(hist1, [hist2])` are essentially the same function if you have a way to talk about “singleton solution” (e.g., `OneHotVector` in the example above or `SingletonDict` in Tutorial: Parallelism · Transducers.jl).

That said, arguably, coming up with the mechanism like above may not be so user-friendly. I’m thinking to add new syntax to FLoops.jl so that we can write something like this:

``````@floop begin
@init buf = zeros(Int, 10)  # per basecase initialization
for x in xs
bin = max(1, min(10, floor(Int, x)))
buf[bin] += 1  # intra-basecase reduction (no syntax)
end
@combine h .+= buf
end

h :: Vector{Int}  # computed histogram
``````
1 Like

When using a pattern like that, I am always unsure if some customization can be done. For example:

If I happened to have to preallocate this buffer, meaning `@init buf = pre_buf[threadid()]` would that possibly work? (I am not mentioning specifically the `threadid()` use, but I would not know how to perform such preallocation, since I don’t see exactly what the macro will be doing with that instruction).

Also, I can imagine that if I preallocate the buffer, I may go against the rationale of the `@floops` macro, because I guess it does something more smart than having all buffers next to each other in the same array to be distributed among threads.

Another pattern I am facing which I have trouble trying to “translate” to the Folds syntax is something like the one below. This is a toy example in which I want to build a list of the numbers smaller than 0.5, but it captures some of the characteristics of the true problem:

``````using Base.Threads: @threads, nthreads

struct List
n::Int
l::Vector{Float64}
end
add_to_list(x,list) = x < 0.5 ? List(list.n+1,push!(list.l,x)) : list

# Serial version
function build_list(x)
list = List(0,zeros(0))
for i in eachindex(x)
end
return list
end

# Parallel version
append_lists!(list1,list2) = List(list1.n+list2.n,append!(list1.l,list2.l))

list = List(0,zeros(0))
end
end
# reduce
list = append_lists!(list,lst)
end
return list
end

``````

Uhm… now that I think this one may not be very different from the above, and with the “new syntax” could be something like:

``````list = List(0,zeros(0))
@floop begin
@init buf = List(0,zeros(0))
for i in eachindex(x)
end
@combine append_lists!(list,buf) # ???
end
``````

My doubts there would roughly the same: 1) could I preallocate the `buf`s if I was to call this many times? 2) Is there anything special needed for the `@combine` syntax since the combination of two `List` objects is not simply an addition?

My plan for `@combine` is to re-use `@reduce` syntax as much as possible. So your example above can be written as something like

``````@floop begin
@init buf = List(0,zeros(0))
for i in eachindex(x)
end
@combine list = append_lists!(List(0,zeros(0)), buf)
end
``````

To reuse buffers allocated as `buffers = [List(0,zeros(0)) for _ in 1:ntasks]`, you can do something like

``````maybe_append_lists!(a::List, b::Union{Nothing,List}) =
if b === nothing
a
else
append_lists!(a, b)
end

@floop begin
@init result = nothing
for (buf, item) in zip(buffers, x)
end
@combine list = maybe_append_lists!(List(0,zeros(0)), result)
end
``````

Or, if you want to avoid manual `zip`ing (which may require chunking `x`):

``````function append_lists_then_recycle!(a, b)
append_lists!(a, b)
recycle!(handle, b)
return a
end

@floop begin
@init buf = get_buffer!(handle)
for item in x
end
@combine list = append_lists_then_recycle!(List(0,zeros(0)), buf)
end
``````

where `get_buffer!` and `recycle!` are thread-safe and `handle` is an object that keeps the pre-allocated buffers. Maybe it’s better to create a package that implements `get_buffer!` and `recycle!` to automate this pattern.

Also, we don’t need to implement functions/closures like `maybe_append_lists!` and `append_lists_then_recycle!` since I’m planning to provide `@combine() do` syntax similar to `@reduce() do`.

Note also that the new syntax does not add anything new in terms of the functionality. It’s just a different syntax sugar. You can already do this in current FLoops.jl or even just Transducers.jl or Folds.jl.

Do you mean if the `@combine` syntax can support arbitrary combining functions like `append_lists!`? If so, then yes, `@combine` will support arbitrary functions (monoids) that combine two sub-results.

1 Like