More threads, slower code, even if not spawning them

Maybe I don’t understand how @spawn works, but here is what I am observing: When executing a parallel task with it, I observe that the performance is significantly worse if I have more threads available (i. e. starting julia with -tX with greater X) even if I am not spawning more threads.

An example code is bellow, but essentially what I see is:

  • Start julia with julia -t32 and execute the code spawning 32 threads: 216 μs
  • Start julia with julia -t64 and execute the code spawning 32 threads: 3.323 ms

How to understand this? The result of that is that in a parallel code, even if I know that it is not worth to spawn more threads because of the limited scalability, the performance gets worse just for the fact that more threads are available.

(FWIW, this is in a computer with 128 real cores)

Test code:

using Base.Threads: @spawn
using LinearAlgebra: norm
using StaticArrays
using Test

# Parallel code for sum(norm.( x - y for x in x, y in y ))
function sumd(x,y,nbatches;aux=zeros(nbatches))
    @assert length(x)%nbatches == 0
    batchsize = length(x) ÷ nbatches
    aux .= 0.
    @sync for ibatch in 1:nbatches
        ifirst = (ibatch-1)*batchsize + 1
        ilast = ibatch*batchsize
        @spawn begin
            for i in ifirst:ilast
                for j in 1:length(y)
                    @inbounds aux[ibatch] += norm(y[j]-x[i])
                end
            end
        end
    end
    return sum(aux)
end

function test(nbatches)
    x = [ rand(SVector{3,Float64}) for _ in 1:6400 ]
    y = [ rand(SVector{3,Float64}) for _ in 1:10 ]
    @test sumd(x,y,nbatches) ≈ sum(norm.( x - y for x in x, y in y ))
    aux = zeros(nbatches)
    @btime sumd($x,$y,$nbatches;aux=$aux)
end

Here is the step-by-step:

Start Julia with 32 threads, and spawn 32 tasks:

[lmartine@adano58 old]$ JULIA_EXCLUSIVE=1 julia -t32
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.7.0 (2021-11-30)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
  216.054 μs (205 allocations: 17.83 KiB)
42508.465422704496

Start Julia with 64 threads, and spawn again 32 tasks:

[lmartine@adano58 old]$ JULIA_EXCLUSIVE=1 julia -t64
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.7.0 (2021-11-30)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
  3.323 ms (205 allocations: 17.83 KiB)
40862.74149814549


1 Like

Is your system hyper threaded? what are the number of physical cores?

I am no expert, but I have sometimes found more threads than physical cores to be problematic (over subscription); especially as JULIA_EXCLUSIVE is trying to set affinities to cores, performance will depend on how exactly that is done.

1 Like

Yeah, I updated that. 128 physical cores, not hyper-threaded.

Is this 2 64 core cpus? If so, part of the problem might be numa.

3 Likes

It seems:

[ 0.077000] smpboot: CPU0: AMD EPYC 7662 64-Core Processor (family: 0x17, model: 0x31, stepping: 0x0)

But, apparently, it is hyper-threaded (though the cluster administrator does not allow running more than 128 tasks). Is it easy to avoid using the virtual cores?

edit: I will play with GitHub - carstenbauer/ThreadPinning.jl: Pinning Julia threads to cores and see what happens.

I have resorted to pinning threads/cores by hand (using Hwloc) on Linux before, not easy to do on mac for example. As expected ThreadPinning.jl only works on linux.

1 Like

Thanks @Oscar_Smith and @raminammour for the hints. It is linux.

It seem that it has something to do with that, indeed.

Using the ThreadPinning package I can obtain better results, although they still fluctuate a lot. The best strategy seems to be pinthreads(:scatter) here, although from the output I would think that :halfcompact would be picking the real cores). @carstenbauer: do the thread pinning takes into to consideration the use of the cores (by the system or others?).

No. The user needs to explicitly specify a pinning strategy, as you did with pinthreads(:scatter). While I can in principle imagine a pinthreads(:unused) option, I mostly wrote ThreadPinning.jl for cluster usage in which case one typically has an empty compute node anyways.

1 Like

Note that macOS simply doesn’t support pinning of processes or threads. So it is not just “not easy to do” but in fact impossible. Thus, there is nothing that ThreadPinning.jl can do about it.

2 Likes

I thought I had found something OSX about them supporting thread affinity when I looked, but that looks old and experimental, and nothing for process binding.

Yeah, in my particular case I have to benchmark the code in a shared computer, so it is not really empty.

But the fluctuations I see I don’t think I can attribute only to the competing resources.

Your package is really nice, thanks.

It’s not relevant to the question, but I’d imagine aux[ibatch] += may be causing false sharing (unless LLVM does something clever by generating two versions of the loop or TBAA kicks in)

1 Like

side question: how can I set the basesize option in @floop to see how it behaves on that machine changing the “effective” number of threads. I could not find an example in the docs. This is what I tried:

function sumd_floop(x,y,nbatches)
    @floop basesize = div(length(x),nbatches) for i in 1:length(x)
        for j in 1:length(y)
            d = norm(y[j]-x[i])
            @reduce( dsum = 0. + d )
        end
    end
    return dsum
end

You can use @floop ThreadedEx(basesize = div(length(x),nbatches)) for.

Avoiding false sharing in your code is also easy:

        @spawn begin
            acc = zero(eltype(aux))
            for i in ifirst:ilast
                for j in 1:length(y)
                    acc += @inbounds norm(y[j]-x[i])
                end
            end
            aux[ibatch] = acc
        end
2 Likes

I’ve added a @threads and a @floop example here. Both are much faster than the @spawn option (no idea why for the @threads vs. @spawn). @floop performs particularly well.

At the same time, all the options suffer from the “more threads available, less performance”, even if these more threads are not being used.

Last version of the code here: https://github.com/m3g/CellListMapBenchmarks.jl/blob/main/test/test.jl

Result:

[lmartine@adano58 old]$ JULIA_EXCLUSIVE=1 julia -t32
...
julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
sumd threads:   128.640 μs (161 allocations: 15.33 KiB)
sumd spawn:     1.226 ms (237 allocations: 18.33 KiB)
floops:         94.447 μs (283 allocations: 18.00 KiB)


[lmartine@adano58 old]$ JULIA_EXCLUSIVE=1 julia -t64
...
julia> include("./test.jl")
test (generic function with 1 method)

julia> test(32)
sumd threads:   212.327 μs (321 allocations: 30.59 KiB)
sumd spawn:     3.567 ms (237 allocations: 18.33 KiB)
floops:         123.501 μs (283 allocations: 18.00 KiB)

The terrible performance of @spawn here is somewhat associated with the thread pinning, as:

julia> pinthreads(:halfcompact)

julia> test(32)
sumd threads:   248.985 μs (321 allocations: 30.59 KiB)
sumd spawn:     4.704 ms (237 allocations: 18.33 KiB)
floops:         187.130 μs (283 allocations: 18.00 KiB)

julia> pinthreads(:scatter)

julia> test(32)
sumd threads:   397.252 μs (321 allocations: 30.59 KiB)
sumd spawn:     532.616 μs (237 allocations: 18.33 KiB)
floops:         174.627 μs (286 allocations: 18.09 KiB)

julia> pinthreads(:compact)

julia> test(32)
sumd threads:   208.800 μs (321 allocations: 30.59 KiB)
sumd spawn:     3.610 ms (237 allocations: 18.33 KiB)
floops:         116.478 μs (283 allocations: 18.00 KiB)

On my real application I don’t see such a large gap relative to using threads, though (and again I am tempted to give floops another try there, I always struggle with the fact that I need to define custom reduction functions and I don’t understand how that can be done safely).

Hmm… Interesting that the spawn version doesn’t perform so well. I looks like a reasonable implementation to me.

I’m always looking for examples to put in the documentation. So let me know if you have some MWE (using @threads or @spawn) representing the pattern you need. :slight_smile:

The problem I have is that the user can provide variables types of data, and the function to be executed in parallel is provided as a closure. Something like this, which have the following properties:

  1. The user can provide a function f that will be computed in parallel for each element of the input data x.

  2. The result of f can be mutable or immutable, in fact it can be anything.

  3. Thus, the reduction function cannot be generalized. There is a default strategy (exemplified below), but if the user function and data is particular, he/she has to implement and provide a custom function.

function my_code(f, x, result; 
    result_threaded = [ deepcopy(result) for _ in 1:nthreads() ],
    reduce=sum(result_threaded)
)
    @threads for ithread in 1:nthreads()
        for i in ithread:nthreads():length(x) 
            result_threaded[ithread] = f(x[i])
        end
    end
    result = reduce(result_threaded)
    return result
end

A typical use case could be that the ser is updating an histogram. In this case, for example, one could have:

function add_to_histogram!(x,hist)
    bin = floor(x/0.1) + 1
    hist[bin] += 1
    return hist
end

function reduce_histogram!(hist,hist_threaded)
    hist .= hist_threaded[1]
    for i in 2:length(hist_threaded)
        hist .+= hist_threaded[i]
    end
    return hist
end

And then the user could call the main function with:

hist = zeros(Int,10)
my_code(
    x -> add_to_histogram(x,hist),
    x, hist, 
    reduce = reduce_histogram
)

I don’t know if the idea is clear, but the point is that the user can pass any type of data, using for instance a closure, and have to define a custom reduction function. It may also want to preallocate the threaded output (the results_threaded array), because this can be executed many times inside an outer hot loop.

Thus there is a lot of flexibility there, which I was not able to understand if it can be mapped into a more specialized and better parallel strategy.

Anyway, any insight is very welcome.

Technically, what you described is what parallel reduce does:

using MicroCollections: OneHotVector
using BangBang.Extras: broadcast_inplace!!
using InitialValues: InitialValue

compute_bin(x) = min(10, floor(Int, x / 0.1) + 1)
f(x) = OneHotVector(compute_bin(x) => 1, 10)
op!!(h1, h2) = broadcast_inplace!!(+, h1, h2)  # similar to `h1 .+= h2`

using Folds
Folds.mapreduce(f, op!!, rand(1000); init = InitialValue(+))

I’ve also discussed how to do hisogram this way elsewhere too:

The main observation is that add_to_histogram! and (hist1, hist2) -> reduce_histogram!(hist1, [hist2]) are essentially the same function if you have a way to talk about “singleton solution” (e.g., OneHotVector in the example above or SingletonDict in Tutorial: Parallelism · Transducers.jl).

That said, arguably, coming up with the mechanism like above may not be so user-friendly. I’m thinking to add new syntax to FLoops.jl so that we can write something like this:

@floop begin
    @init buf = zeros(Int, 10)  # per basecase initialization
    for x in xs
        bin = max(1, min(10, floor(Int, x)))
        buf[bin] += 1  # intra-basecase reduction (no syntax)
    end
    @combine h .+= buf
end

h :: Vector{Int}  # computed histogram

Ref: https://github.com/JuliaFolds/FLoops.jl/issues/114

1 Like

When using a pattern like that, I am always unsure if some customization can be done. For example:

If I happened to have to preallocate this buffer, meaning @init buf = pre_buf[threadid()] would that possibly work? (I am not mentioning specifically the threadid() use, but I would not know how to perform such preallocation, since I don’t see exactly what the macro will be doing with that instruction).

Also, I can imagine that if I preallocate the buffer, I may go against the rationale of the @floops macro, because I guess it does something more smart than having all buffers next to each other in the same array to be distributed among threads.

Another pattern I am facing which I have trouble trying to “translate” to the Folds syntax is something like the one below. This is a toy example in which I want to build a list of the numbers smaller than 0.5, but it captures some of the characteristics of the true problem:

using Base.Threads: @threads, nthreads

struct List
    n::Int
    l::Vector{Float64}
end
add_to_list(x,list) = x < 0.5 ? List(list.n+1,push!(list.l,x)) : list

# Serial version
function build_list(x)
    list = List(0,zeros(0))
    for i in eachindex(x)
        list = add_to_list(x[i],list)
    end
    return list
end

# Parallel version
append_lists!(list1,list2) = List(list1.n+list2.n,append!(list1.l,list2.l))

function build_list_threads(x)
    list = List(0,zeros(0))
    list_threaded = [ deepcopy(list) for _ in 1:nthreads() ]
    @threads for ithread in 1:nthreads()
        local_list = list_threaded[ithread]
        for i in ithread:nthreads():length(x)
            local_list = add_to_list(x[i],local_list)
        end
        list_threaded[ithread] = local_list
    end
    # reduce
    for lst in list_threaded
        list = append_lists!(list,lst)
    end
    return list
end

Uhm… now that I think this one may not be very different from the above, and with the “new syntax” could be something like:

list = List(0,zeros(0))
@floop begin
    @init buf = List(0,zeros(0))
    for i in eachindex(x)
        buf = add_to_list(x[i],buf)
    end
    @combine append_lists!(list,buf) # ??? 
end

My doubts there would roughly the same: 1) could I preallocate the bufs if I was to call this many times? 2) Is there anything special needed for the @combine syntax since the combination of two List objects is not simply an addition?

My plan for @combine is to re-use @reduce syntax as much as possible. So your example above can be written as something like

@floop begin
    @init buf = List(0,zeros(0))
    for i in eachindex(x)
        buf = add_to_list(x[i],buf)
    end
    @combine list = append_lists!(List(0,zeros(0)), buf)
end

To reuse buffers allocated as buffers = [List(0,zeros(0)) for _ in 1:ntasks], you can do something like

maybe_append_lists!(a::List, b::Union{Nothing,List}) =
    if b === nothing
        a
    else
        append_lists!(a, b)
    end

@floop begin
    @init result = nothing
    for (buf, item) in zip(buffers, x)
        result = add_to_list(itme, buf)
    end
    @combine list = maybe_append_lists!(List(0,zeros(0)), result)
end

Or, if you want to avoid manual ziping (which may require chunking x):

function append_lists_then_recycle!(a, b)
    append_lists!(a, b)
    recycle!(handle, b)
    return a
end

@floop begin
    @init buf = get_buffer!(handle)
    for item in x
        buf = add_to_list(itme, buf)
    end
    @combine list = append_lists_then_recycle!(List(0,zeros(0)), buf)
end

where get_buffer! and recycle! are thread-safe and handle is an object that keeps the pre-allocated buffers. Maybe it’s better to create a package that implements get_buffer! and recycle! to automate this pattern.

Also, we don’t need to implement functions/closures like maybe_append_lists! and append_lists_then_recycle! since I’m planning to provide @combine() do syntax similar to @reduce() do.

Note also that the new syntax does not add anything new in terms of the functionality. It’s just a different syntax sugar. You can already do this in current FLoops.jl or even just Transducers.jl or Folds.jl.

Do you mean if the @combine syntax can support arbitrary combining functions like append_lists!? If so, then yes, @combine will support arbitrary functions (monoids) that combine two sub-results.

1 Like