I am trying to write the parallel prefix scan as a multithreaded CPU function. Since the number of processors is small relative to the length of the array It wouldn’t be fruitful to use Hillis-Steele or Belloch scan.
Instead for n
processors my algorithm divides the array into n
parts and then spawns a task to each thread. Storing each part’s sum to a different array and then performing a prefix scan on this array. Finally broadcasting back these sums to n - 1
parts which need the sum of previous parts.
So, in theory for 2 processors the new algorithm should take \frac{t}{2} + \frac{t}{4} =\frac{3t}{4}.
And for 6 processors (my case), \frac{t}{6} + \frac{5t}{36} = \frac{11t}{36}
function scan_naive!(op, output::Vector{T}, input::Vector{T}) where T <: Number
output[1] = input[1]
for i = 2:length(input)
@inbounds output[i] = op(output[i - 1], input[i])
end
output
end
using Plots
sizes = 1 .<< collect(10:29)
function timer(f, op, len)
A = ones(Int32, len)
B = similar(A)
x = Base.@elapsed f(op, B, A)
y = Base.@elapsed f(op, B, A)
z = Base.@elapsed f(op, B, A)
min(x, y, z) # Unstable results without multiple trials for multithreaded version
end
t = [timer(scan_naive!, +, i) for i in sizes]
plot(sizes, t, title="Benchmark for prefix scan", scale=:log10, marker = :circle, legend=:bottomright)
function threaded_scan!(op, output::Vector{T}, input::Vector{T}) where T <: Number
segment_length = length(input) ÷ nthreads()
sums = Array{T}(undef, nthreads())
@threads for i = 1:nthreads()
low = 1 + (threadid() - 1)*segment_length
high = threadid()*segment_length
threadid() == nthreads() && (high = length(input))
@inbounds output[low] = input[low]
for j in (low + 1):high
@inbounds output[j] = op(input[j], output[j - 1])
end
@inbounds sums[threadid()] = output[high]
end
scan_naive!(op, sums, sums)
@threads for i in (segment_length + 1):length(input)
segment = (i - 1) ÷ segment_length
i >= nthreads()*segment_length && (segment = nthreads() - 1)
@inbounds output[i] = op(sums[segment], output[i])
end
output
end
t2 = [timer(threaded_scan!, +, i) for i in sizes]
plot(sizes, [t, t2], title="Benchmark for prefix scan", scale=:log10, marker = :circle, legend = :bottomright)
This result was surprising to me as I expected the multithreaded version to overtake the single threaded version by the end which is an array of size 2^{28} \approx 5\cdot10^8.
Then I made a small optimization which is that I use only n - 1
processors in the latter stage to broadcast partial sums which will remove the overhead of computing the segment and kick in SIMD instructions.
function threaded_scan2!(op, output::Vector{T}, input::Vector{T}) where T <: Number
segment_length = length(input) ÷ nthreads()
sums = Array{T}(undef, nthreads())
@threads for i = 1:nthreads()
low = 1 + (threadid() - 1)*segment_length
high = threadid()*segment_length
threadid() == nthreads() && (high = length(input))
@inbounds output[low] = input[low]
for j in (low + 1):high
@inbounds output[j] = input[j] + output[j - 1]
end
@inbounds sums[threadid()] = output[high]
end
scan_naive!(op, sums, sums)
tasks = []
for i in 2:nthreads()
low = 1 + (i- 1)*segment_length
high = i*segment_length
v = view(output, low:high)
tsk = Threads.@spawn @inbounds v .+= sums[i - 1] # Only does addition, `op` is useless here
push!(tasks, tsk)
end
wait.(tasks)
output
end
This appears to be better but still not beating the single threaded version. Since this is a memory intensive kernel maybe that’s the bottleneck that’s slowing this down, is there a way to confirm this suspicion ?