Understanding @turbo behavior to use it right

s = ones(5)
t = 2*ones(Int8,5)
@turbo for i = 1:5
    a = t[i]
    s[a] += t[i]
end

Above code gives the values of s vector as [1.0, 3.0, 1.0, 1.0, 1.0].
If I remove the macro @turbo, I get the correct result, [1.0, 11.0, 1.0, 1.0, 1.0]. I don’t understand, what is happening?

You can think of @turbo as a kind of parallelization, and you are trying to update s[2] concurrently among threads, leading to the wrong result. The parallelization does not occur among processors, but within a single processor, using the capacity of modern processors of executing multiple similar operations at the same time (SIMD). You should not use SIMD for this kind of construct.

Thanks, I understand. But if I take a scalar variable, say s (as shown below), I get the correct result. Why a scalar works but not an element of vector?

s = 0
t = 2*ones(Int8,5)
@turbo for i = 1:5
    a = t[i]
    s += t[i]
end

Also, if I do want to run the above process in parallel, How can it be done?

That’s a good question, and I don’t really know the exact answer. But by knowing that s is a scalar (an immutable variable), the SIMD operation knows that it can reduce (sum terms) in any order, and at the end sum everything to obtain the final result. Note that in the original code a can change if the contents of the t vector change, and that will change in which element of t you would be updating the value, I think that is too complicated for the compiler to understand what do you want.

The example is too simple, and I think suggestions on that will not be really useful for understanding SIMD or parallelism in general. The most elemental way to avoid concurrency is to copy the data the number of times required by threads, and reduce the result at the end. But that is not related to SIMD. Maybe if you explore a slightly more meaningful example better overall approaches can be suggested for getting what you want.

Yes.
@turbo makes optimistic aliasing assumptions.
If an index depends on a loop, it assumes the address is different for every iteration.
If an index does not depend on a loop, it assumes the address is the same for every iteration.

In other words, because you have s[a] and a depends on loop i, it assumes every iteration of i will result in a different address of s, and thus these can be performed in parallel.

In the scalar case, it knows you’re accumulating to the same number across iterations, and thus does that correctly.

I’m prioritizing correctness in the rewrite, so it should handle it correctly.
But making that case fast is difficult. I’d have to think about it more.
With the rewrite, it’d need you to add @simd ivdep to get the current behavior.

4 Likes

So, I have to use a function, like one given below, which counts the occurrences of a value in an array with given weights instead of raw counts.
Below given function does the job, however for the simulation, the length of the array(input) would be in millions. So, I’d need to parallelise this at the end.

function bincount(array, weights, minlength)
    # counts the instances of elements(integers) in an array with corresponding weight.
    # minlength : number of unique elements in array.
    ret = zeros(Float64, minlength)
    for index = 1:length(array)
        i = array[index]
        ret[i+1] += weights[index]
    end
    return ret
end

I can’t use @tturbo in the function above due the problem discussed. What would be a good way to deal with this?

function bincount!(x, array, weights)
     for index = eachindex(array)
        i = array[index]
        x[i+1] += weights[index]
    end
end
function bincount_threads(array, weights, minlength)
     l = (minlength+13) & -8
     nt = Threads.nthreads()
     acc = zeros(Float64, l, nt)
     N = length(array)
     Threads.@threads for tid = 1:nt
         lb = ((tid-1) * N Γ· nt) + 1
         ub = (tid * N) Γ· nt
         bincount!(@view(acc[:,tid]), @view(array[lb:ub]), @view(weights[lb:ub]))
     end
     vec(sum(@view(acc[1:minlength,:]), dims=2))
end

something like this should work

1 Like

For the sake of didactic, this is a simpler one, yet it turned out to be slower than the serial version here:

julia> array = rand(1:100, 10^6); weights = rand(10^6); minlength=101;

julia> @btime bincount($array, $weights, $minlength);
  713.524 ΞΌs (1 allocation: 896 bytes)

julia> function bincount_parallel(array, weights, minlength; nchunks=Threads.nthreads())
           rets = [ zeros(Float64, minlength) for _ in 1:nchunks ]
           Threads.@threads for ichunk in 1:nchunks
               for index = ichunk:nchunks:length(array) # simple chunk splitter
                   i = array[index]
                   rets[ichunk][i+1] += weights[index]
               end
           end
           return sum(rets)
       end
bincount_parallel (generic function with 1 method)

julia> bincount_parallel(array, weights, minlength) β‰ˆ bincount(array, weights, minlength)
true

julia> @btime bincount_parallel($array, $weights, $minlength);
  951.981 ΞΌs (97 allocations: 26.78 KiB)

I didn’t test Chris function, which by definition is much better than mine, but anyway this is a problem hard to parallelize effectively, as it is essentially dependent on memory accesses in random order. If you could compute the weights on the flight, and cheaply, it would be much faster, probably.

@Sagar_123
This is a version which is easier to understand and not that bad:

julia> function bincount_parallel(array, weights, minlength; nchunks=Threads.nthreads())
           rets = [ zeros(eltype(weights), minlength) for _ in 1:nchunks ]
           chunks = collect(Iterators.partition(eachindex(array), div(length(array),nchunks-1)))
           Threads.@threads for ichunk in 1:nchunks
               for index in chunks[ichunk]
                   i = array[index]
                   rets[ichunk][i+1] += weights[index]
               end
           end
           return sum(rets)
       end
bincount_parallel (generic function with 1 method)

julia> bincount_parallel(array, weights, minlength) β‰ˆ bincount(array, weights, minlength)
true

julia> @btime bincount_parallel($array, $weights, $minlength);
  258.866 ΞΌs (98 allocations: 27.05 KiB)

julia> @btime bincount_threads($array, $weights, $minlength);  # Chris version
  216.041 ΞΌs (84 allocations: 17.80 KiB)

The main conceptual difference is that the chunks are now contiguous in memory, by the use of Iterators.partition.

1 Like
julia> bincount_parallel(array, weights, minlength) β‰ˆ bincount(array, weights, minlength) β‰ˆ bincount_threads(array, weights, minlength) β‰ˆ bincount_polyester(array, weights, minlength)
true

julia> @benchmark bincount_threads($array, $weights, $minlength)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  198.277 ΞΌs …   3.803 ms  β”Š GC (min … max): 0.00% … 77.31%
 Time  (median):     375.698 ΞΌs               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   422.580 ΞΌs Β± 162.117 ΞΌs  β”Š GC (mean Β± Οƒ):  0.34% Β±  2.01%

        β–β–ƒβ–…β–†β–ˆβ–‡β–‡β–†β–…β–„β–ƒβ–‚β–
  β–β–‚β–‚β–„β–…β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–†β–†β–…β–…β–…β–…β–„β–ƒβ–„β–ƒβ–ƒβ–ƒβ–ƒβ–ƒβ–‚β–ƒβ–‚β–ƒβ–‚β–‚β–‚β–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–β–‚β–β–β–β– β–„
  198 ΞΌs           Histogram: frequency by time          929 ΞΌs <

 Memory estimate: 26.72 KiB, allocs estimate: 120.

julia> @benchmark bincount_polyester($array, $weights, $minlength)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  109.269 ΞΌs …  42.170 ms  β”Š GC (min … max): 0.00% … 2.54%
 Time  (median):     240.683 ΞΌs               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   266.718 ΞΌs Β± 729.224 ΞΌs  β”Š GC (mean Β± Οƒ):  0.12% Β± 0.04%

          β–β–ƒβ–ƒβ–…β–†β–†β–ˆβ–ˆβ–‡β–ˆβ–†β–‡β–…β–†β–…β–…β–„β–‚β–‚β–‚β–
  β–β–β–β–‚β–ƒβ–„β–…β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–‡β–‡β–†β–…β–…β–…β–…β–„β–ƒβ–ƒβ–ƒβ–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–‚β–β–β–β–β–‚β–β–β–β–β– β–„
  109 ΞΌs           Histogram: frequency by time          511 ΞΌs <

 Memory estimate: 15.97 KiB, allocs estimate: 11.

julia> @benchmark bincount_parallel($array, $weights, $minlength)
BenchmarkTools.Trial: 4280 samples with 1 evaluation.
 Range (min … max):  838.059 ΞΌs …   2.953 ms  β”Š GC (min … max): 0.00% … 57.71%
 Time  (median):       1.149 ms               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):     1.161 ms Β± 104.605 ΞΌs  β”Š GC (mean Β± Οƒ):  0.10% Β±  1.56%

                     β–‚β–ƒβ–…β–…β–ˆβ–‡β–ˆβ–‡β–‡β–„β–ƒ
  β–‚β–β–β–β–β–β–β–β–‚β–‚β–‚β–‚β–‚β–ƒβ–ƒβ–„β–…β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–†β–„β–„β–ƒβ–ƒβ–‚β–ƒβ–ƒβ–‚β–‚β–ƒβ–‚β–‚β–‚β–‚β–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚ β–„
  838 ΞΌs           Histogram: frequency by time         1.59 ms <

 Memory estimate: 41.28 KiB, allocs estimate: 145.

julia> @benchmark bincount($array, $weights, $minlength)
BenchmarkTools.Trial: 5438 samples with 1 evaluation.
 Range (min … max):  843.576 ΞΌs …  1.291 ms  β”Š GC (min … max): 0.00% … 0.00%
 Time  (median):     913.180 ΞΌs              β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   913.291 ΞΌs Β± 18.513 ΞΌs  β”Š GC (mean Β± Οƒ):  0.00% Β± 0.00%

                                   β–ƒβ–†β–ˆβ–…β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚
  β–‚β–‚β–ƒβ–ƒβ–ƒβ–ƒβ–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–β–β–‚β–‚β–β–‚β–‚β–‚β–‚β–β–β–‚β–‚β–ƒβ–…β–†β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–†β–…β–…β–„β–„β–ƒβ–ƒβ–ƒβ–ƒβ–ƒβ–ƒ β–„
  844 ΞΌs          Histogram: frequency by time          949 ΞΌs <

 Memory estimate: 896 bytes, allocs estimate: 1.

The polyester one is using Polyester.@batch instead.
This was with 18 threads on an 18-core desktop.

julia> versioninfo()
Julia Version 1.9.0-DEV.1331
Commit 6f8e24c672* (2022-09-12 16:52 UTC)
Platform Info:
  OS: Linux (x86_64-redhat-linux)
  CPU: 36 Γ— Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-14.0.5 (ORCJIT, skylake-avx512)
  Threads: 18 on 36 virtual cores
1 Like

@Elrod @lmiq Thank You, this is really helpful and interesting. Can you suggest some resources to learn about parallelisation in different scenarios as I’m new to parallel computing.
My work involves particle simulation (pic methods in particular) where same calculations are performed over each time step but every step depends on the previous step so I can only parallelise the calculations at each step.

In general, in cases like this, your best bet is to parallelize each time step (or multiple independent simulations) rather than trying to parallelize over timesteps.

I’m not used to the specificities of PIC methods, but in the kind of particle simulations I work with (molecular simulations) the dominant cost is that of computing interparticle forces, which is naively dependent on the square of the number of particles. That computation is what one has to accelerate and parallelize. The update of positions and velocities, which are dependent on successive steps, are relatively cheap.

side note: may I ask what was the intention here?

It should’ve been 13. The motivation is to avoid false sharing.
That is, a cacheline is the smallest unit of memory that can be treated independently by many parts of the system. To avoid synchronization between cores, we need each thread to work on separate cachelines.
On x64 CPUs, cachelines are 64 bytes.
On M1 CPUs, they’re 128 bytes.

julia> function bincount_polyester(array, weights, minlength, offset = 13)
            l = (minlength+offset) & -8
            nt = Threads.nthreads()
            acc = zeros(Float64, l, nt)
            N = length(array)
            @batch for tid = 1:nt
                lb = ((tid-1) * N Γ· nt) + 1
                ub = (tid * N) Γ· nt
                bincount!(@view(acc[:,tid]), @view(array[lb:ub]), @view(weights[lb:ub]))
            end
            vec(sum(@view(acc[1:minlength,:]), dims=2))
       end
bincount_polyester (generic function with 2 methods)

julia> array = rand(1:100, 10^6); weights = rand(10^6); minlength=101;

julia> @benchmark bincount_polyester($array, $weights, $minlength, 7)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  112.959 ΞΌs …  43.887 ms  β”Š GC (min … max): 0.00% … 2.50%
 Time  (median):     265.806 ΞΌs               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   290.294 ΞΌs Β± 758.469 ΞΌs  β”Š GC (mean Β± Οƒ):  0.11% Β± 0.04%

            β–‚β–ƒβ–ƒβ–…β–…β–‡β–†β–ˆβ–‡β–‡β–‡β–‡β–‡β–‡β–†β–…β–†β–„β–ƒβ–„β–ƒβ–‚β–β–β–
  β–β–β–‚β–‚β–ƒβ–„β–„β–†β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–‡β–†β–…β–†β–…β–…β–„β–ƒβ–„β–ƒβ–ƒβ–ƒβ–ƒβ–‚β–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–β–β–‚ β–…
  113 ΞΌs           Histogram: frequency by time          516 ΞΌs <

 Memory estimate: 15.97 KiB, allocs estimate: 11.

julia> @benchmark bincount_polyester($array, $weights, $minlength, 13)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):   67.563 ΞΌs …  43.869 ms  β”Š GC (min … max): 0.00% … 2.70%
 Time  (median):     150.542 ΞΌs               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   170.990 ΞΌs Β± 620.456 ΞΌs  β”Š GC (mean Β± Οƒ):  0.14% Β± 0.04%

       β–‚β–ƒβ–…β–…β–†β–†β–‡β–‡β–ˆβ–‡β–‡β–ˆβ–†β–…β–…β–„β–„β–ƒβ–ƒβ–‚β–‚β–‚
  β–β–‚β–ƒβ–„β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–†β–…β–†β–…β–…β–…β–„β–„β–„β–„β–ƒβ–ƒβ–ƒβ–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–β– β–…
  67.6 ΞΌs          Histogram: frequency by time          346 ΞΌs <

 Memory estimate: 17.09 KiB, allocs estimate: 11.

Why 13?

Julia arrays are 16-byte aligned by default (not 64-byte), and I’m assuming 64 byte cachelines here.
Given 16 byte alignment, that means our arrays’ alignment mod 64 could be 0, 16, 32, or 48.
Dividing everything by 16 so we have 0, 1, 2, or 3…

julia> reshape(0:39, 4, 10)
4Γ—10 reshape(::UnitRange{Int64}, 4, 10) with eltype Int64:
 0  4   8  12  16  20  24  28  32  36
 1  5   9  13  17  21  25  29  33  37
 2  6  10  14  18  22  26  30  34  38
 3  7  11  15  19  23  27  31  35  39

This matrix represents the linear memory addresses divided by the minimum alignment (16), so. 0 is 0, 1 is 16, 2 is 32, 3 is 48…
Each column represents a separate cacheline.

Thus, for good performance, we can’t have any two threads ever reading to the same column.
Imagine a column of a matrix snaking its way across linear indices above. E.g., if you had 12 Float64 per column, that’d be 6 * 16 bytes long (i.e. 12 * 8 bytes).
So if you started at 0, the first row would be 0:5, the second 6:11, third 12:17.
Obviously, many of these are sharing the same cacheline.
So we need to add some padding.

If we could guarantee that it starts at 0, (minlength + 7) & -8 is all we’d need, as that is enough to guarantee that each column of our real matrix starts at the same row in the above alignment matrix as the one before it.

But if we create a random matrix, it’ll start at 0, 1, 2, or 3. I got 13 by just adding 3x 16 bytes (i.e. 6x Float64) to push us to the next column if we were offset.
For example

julia> hcat(1:16, a.(1:16))'
2Γ—16 adjoint(::Matrix{Int64}) with eltype Int64:
 1  2   3   4   5   6   7   8   9  10  11  12  13  14  15  16
 8  8  16  16  16  16  16  16  16  16  24  24  24  24  24  24

So if we have 3x Float64, we’ll need rows per column.
Translating that to the previous matrix, each successive column will start in the same row of the matrix, but 2 columns over.
So if our first column starts at 3, it’ll end inside 4, and the second one starts at 11.

This is probably excessive padding, as we could probably start at an earlier row…
But it’d probably be easier at that point to just over allocate a vector, create a view that is 64-byte aligned so we can start at 0, and then use (n + 7) & -8.

4 Likes

Uau, thank you very much. I’ll pin this post for future reference :slight_smile:

Final note. By using Polyester.@batch instead of Threads.@threads, the performance of the β€œsimpler” function, in which chunks are divided by Iterators.partition is good:

julia> using Polyester 

julia> function bincount_parallel(array, weights, minlength; nchunks=Threads.nthreads())
           rets = [ zeros(eltype(weights), minlength) for _ in 1:nchunks ]
           chunks = collect(Iterators.partition(eachindex(array), div(length(array),max(1,nchunks-1))))
           @batch for ichunk in 1:nchunks
               for index in chunks[ichunk]
                   i = array[index]
                   rets[ichunk][i+1] += weights[index]
               end
           end
           return sum(rets)
       end
bincount_parallel (generic function with 1 method)

julia> @benchmark bincount_parallel($array, $weights, $minlength)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  139.890 ΞΌs …  40.057 ms  β”Š GC (min … max): 0.00% … 2.21%
 Time  (median):     152.571 ΞΌs               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   172.081 ΞΌs Β± 794.746 ΞΌs  β”Š GC (mean Β± Οƒ):  0.20% Β± 0.04%

           β–‚β–ˆβ–‡β–ƒ                                                  
  β–β–‚β–ƒβ–ƒβ–‚β–‚β–‚β–‚β–„β–ˆβ–ˆβ–ˆβ–ˆβ–‡β–…β–…β–†β–…β–…β–„β–„β–ƒβ–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–‚β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β– β–‚
  140 ΞΌs           Histogram: frequency by time          202 ΞΌs <

 Memory estimate: 20.58 KiB, allocs estimate: 26.

julia> @benchmark bincount_polyester($array, $weights, $minlength, 7)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  138.576 ΞΌs …  40.513 ms  β”Š GC (min … max): 0.00% … 3.06%
 Time  (median):     153.326 ΞΌs               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   163.667 ΞΌs Β± 569.397 ΞΌs  β”Š GC (mean Β± Οƒ):  0.15% Β± 0.04%

              β–†β–ˆβ–‡β–„β–ƒ                                              
  β–β–β–‚β–‚β–‚β–β–β–β–β–‚β–ƒβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–†β–…β–„β–ƒβ–ƒβ–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–‚β–β–‚β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β– β–‚
  139 ΞΌs           Histogram: frequency by time          198 ΞΌs <

 Memory estimate: 11.09 KiB, allocs estimate: 11.

In this specific case, the fact that Iterators.partition will feed chunks far from each other to the batches probably avoids most false sharing.

The difference is enormous relative to the version where the chunks are next to each other in memory, so that is actually I nice false sharing example:

julia> function bincount_parallel(array, weights, minlength; nchunks=Threads.nthreads())
           rets = [ zeros(eltype(weights), minlength) for _ in 1:nchunks ]
           @batch for ichunk in 1:nchunks
               for index in ichunk:nchunks:length(array)
                   i = array[index]
                   rets[ichunk][i+1] += weights[index]
               end
           end
           return sum(rets)
       end
bincount_parallel (generic function with 1 method)

julia> @benchmark bincount_parallel($array, $weights, $minlength)
BenchmarkTools.Trial: 3186 samples with 1 evaluation.
 Range (min … max):  1.408 ms …  40.304 ms  β”Š GC (min … max): 0.00% … 2.16%
 Time  (median):     1.545 ms               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   1.558 ms Β± 689.603 ΞΌs  β”Š GC (mean Β± Οƒ):  0.02% Β± 0.04%

                   β–ƒβ–…β–‡β–ˆβ–„                                       
  β–β–β–β–β–‚β–ƒβ–ƒβ–ƒβ–ƒβ–„β–…β–…β–†β–†β–†β–†β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–†β–„β–ƒβ–ƒβ–‚β–‚β–‚β–‚β–‚β–‚β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β– β–‚
  1.41 ms         Histogram: frequency by time        1.83 ms <

 Memory estimate: 20.33 KiB, allocs estimate: 25.