Multithreading a for loop

function collect_nodes_frac_serial(nodes, weights, pos, grid_spacing)
    @turbo for i = 1:length(pos)
        nodes[i] = div(pos[i],grid_spacing)
        weights[i] = (pos[i] - nodes[i]*grid_spacing)/grid_spacing
    end
    return nodes, weights
end

Benchmarking the above function for length(pos) = 10220000 gives,

BenchmarkTools.Trial: 193 samples with 1 evaluation.
 Range (min … max):  25.483 ms …  28.958 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     25.832 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   25.981 ms ± 504.045 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▂▃▄▅█▄▂    ▂▂                                                 
  ███████▇▇█▇██▇█▄▄▄▁▄▄▄▄▄▄▁▃▃▄▃▃▄▄▄▁▁▃▁▁▃▃▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▃ ▃
  25.5 ms         Histogram: frequency by time         28.1 ms <

 Memory estimate: 32 bytes, allocs estimate: 1.

However, using @tturbo & nthreads = 8, above function gives,

BenchmarkTools.Trial: 182 samples with 1 evaluation.
 Range (min … max):  27.362 ms …  28.496 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     27.483 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   27.513 ms ± 133.388 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

      ▂ ▃█▂▄▃▄▁ ▃▃ ▁▂                                           
  ▃▃▄▇█▄███████▇██▄██▇▄▇▅▇▃▄▄▆▄▅▇▃▁▃▃▁▃▃▁▃▄▁▁▁▁▁▁▁▅▃▁▃▁▃▁▁▁▁▁▃ ▃
  27.4 ms         Histogram: frequency by time         27.9 ms <

 Memory estimate: 32 bytes, allocs estimate: 1.

Why is multithreading not helping? What is going wrong?

check @show Threads.nthreads()

I guess that your function spend all the time hitting the memory wall (waiting for read and write to the main memory). You could try to artificially increase the computing density adding costly function inside the loop (exp,sin,log…).

@show Threads.nthreads() shows 8 threads.

I tried multithreading using @threads (shown below),

function collect_nodes_frac(nodes, weights, pos, grid_spacing, nchunks=Threads.nthreads())
    chunks = collect(Iterators.partition(eachindex(pos), div(length(pos),nchunks)))
    Threads.@threads for ichunk in 1:nchunks
        for i in chunks[ichunk]
            nodes[i] = div(pos[i],grid_spacing)
            weights[i] = (pos[i] - nodes[i]*grid_spacing)/grid_spacing
        end
    end
    return nodes, weights
end

Benchmarking it gives almost the same result,

BenchmarkTools.Trial: 174 samples with 1 evaluation.
 Range (min … max):  27.877 ms … 43.633 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     28.806 ms              ┊ GC (median):    0.00%
 Time  (mean ± σ):   28.869 ms ±  1.643 ms  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▃█▅    ▄▁▃▂                                                  
  ███▅▂▃▅████▆▂▃▁▁▁▁▁▁▂▁▁▁▁▁▁▁▁▁▁▁▁▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▂ ▂
  27.9 ms         Histogram: frequency by time        35.7 ms <

 Memory estimate: 4.17 KiB, allocs estimate: 45.

try to compare sequential and parallel versions with

weights[i] = log(exp((pos[i] - nodes[i]*grid_spacing)/grid_spacing))

Using

weights[i] = log(exp((pos[i] - nodes[i]*grid_spacing)/grid_spacing))

Benchmarking for serial version :

BenchmarkTools.Trial: 119 samples with 1 evaluation.
 Range (min … max):  41.845 ms …  42.674 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     41.912 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   42.094 ms ± 294.081 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

   ▂▃█▁                                                   ▃     
  ▆████▇▅▆▃▄▄▃▄▁▁▃▁▁▃▁▁▃▁▁▁▃▁▃▁▁▃▁▁▁▁▁▁▁▁▁▁▁▃▁▁▁▁▁▁▁▁▁▁▃▃▆█▇▅▄ ▃
  41.8 ms         Histogram: frequency by time         42.6 ms <

 Memory estimate: 32 bytes, allocs estimate: 1.

Using @tturbo, it is:

BenchmarkTools.Trial: 187 samples with 1 evaluation.
 Range (min … max):  26.574 ms …  27.639 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     26.745 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   26.802 ms ± 193.670 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

      ▇▆▇▄█▂▇▂▂▂ ▃ ▃                                            
  ▅▄▄███████████▇█▇█▆█▅▄▆▄▅▄▁▃▃▁▃▁▁▃▃▄▃▁▃▄▁▁▃▃▅▁▃▁▃▃▁▁▃▁▁▄▁▁▁▃ ▃
  26.6 ms         Histogram: frequency by time         27.4 ms <

 Memory estimate: 32 bytes, allocs estimate: 1.

I understand what is happening now, but isn’t there any way to speed things up for the original case?

Well this is the big question in HPC for the last 20 years… You have to increase the temporal and spatial data locality in order to minimize the RAM bandwidth stress and use your cache hierarchy. If you consider your simple loop, there is not enough work to do compared to the amount of reads and writes to the memory.

The classical example is the matrix*matrix multiplication that you could implement as a succession of scalar products and obtain awful performance. In this case there is a lot of work to do (N^3)
compared to the memory footprint (N^2) and a multilevel block implementation allows to reach peak performance.

#1 You have to increase the ratio (work to do)/(RAM data read+write) called arithmetic intensity or computation density in the roof model.
The most common way of doing so is to fuse operations and avoid read and write.
For example replace

@. x = y + z # 2R +1W
@. x += w # 2R +1W

with

@. x = y + z +w #3R + 1W

In this dummy example you increase the temporal data locality.

#2 If you operate on a machine with several memory pipes, initializing the data in parallel (using the same threads) may help to ensure that your different piece of data are not stored on only one physical memory chip.

4 Likes

@LaurentPlagne Thanks!

1 Like