Multithreading problem

This is my first time trying to use multithreading for real work. I have an inner loop in my code for doing sinc interpolation of a recorded time history file. A minimum working example is

# Minumum working example for threaded operation of a for loop

using Base.Threads
using Random

@inline h(x) = (cos(π/2 * x/n)) ^2    # Hanning window where n is the number of data points in each direction
@inline wsinc(x) = sinc(x) * h(x)     # windowed sinc function

n = 10
ordinate_time = rand(31)
fpart, ipart = modf(13.78)
ipart = Integer(ipart)
ya = 0.0

for (i, filtervalue) in enumerate(-n+fpart : n)
    index = ipart+1-n+i
    ya += ordinate_time[index] * wsinc(filtervalue)
end

ya

This gives a result. When I try multithreading I replace the for loop with

@threads for (i, filtervalue) in enumerate(-n+fpart : n)
    index = ipart+1-n+i
    #ya += ordinate_time[index] * wsinc(filtervalue)
    atomic_add!(ya, ordinate_time[index] * wsinc(filtervalue))
end

I don’t think I need the atomic_add, but I tried it because I really don’t know what I am doing wrong here.

The error is

    nested task error: MethodError: no method matching firstindex(::Base.Iterators.Enumerate{StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}}})

This is a known issue, but I don’t think it’s solved yet

1 Like

The problem is enumerate, which (like many of Julia’s Base.Iterators constructs) doesn’t currently support indexing. You can work around it with something like this:

nrange = -n+fpart : n
Threads.@threads for i in eachindex(nrange)
    filtervalue = nrange[i]
    [...]
end

External libraries like ThreadsX and LoopVectorization tend to have better support for generic threading constructs at the moment, e.g.

using ThreadsX

ya = ThreadsX.sum(enumerate(-n+fpart : n)) do (i, filtervalue)
    index = ipart + 1 - n + i
    return ordinate_time[index] * wsinc(filtervalue)
end
1 Like

FYI, I wrote A quick introduction to data parallelism in Julia for covering patterns like this.

1 Like

A separate performance quibble (maybe not reflected in your actual code, but noted just in case): h(x) depends on the global variable n, which kills performance. Better to specify it as h(x, n).

You might want to try Lanczos interpolation from Interpolations.jl (it’s just a sinc function windowed with a sinc).

support = 10
scale = 4
ya = interpolate(ordinate_time, Interpolations.Lanczos{support}(scale))[13.78]
1 Like

Thank you for pointing me to the ThreadsX library, and showing the code snippet. I think this is the first time I have seen the CPU at 100% on my computer. And also for mentioning the performance quibble. I was looking at that and wondering if I should change it, now I have. And thanks to tkf and all those that have made Julia what it is. I will check out the Lanczos interpolation.

Should I also implement the fastmath library for additional speed?

Doing some more experimentation I find that any attempt to parallelize this loop results in a loss of performance. Would I be correct in assuming that this is because the number of elements to be summed is too small (20 elements)?

My minimum working example now is:

# Minumum working example for threaded operation of a for loop
using ThreadsX, Base.Threads
using Random

function mwe()
@inline h(x,n) = (cos(π/2 * x/n)) ^2    # Hanning window where n is the number of data points in each direction
@inline wsinc(x,n) = sinc(x) * h(x,n)     # windowed sinc function

n = 10
for i = 1:1000000
    ordinate_time = rand(31)
    fpart, ipart = modf(13.78)
    ipart = Integer(ipart)
    #=
    ya = 0.0
    nrange = -n+fpart : n
    for i in eachindex(nrange)
        filtervalue = nrange[i]
        index = ipart+1-n+i
        ya += ordinate_time[index] * wsinc(filtervalue,n)
    end
    =#
    
    ya = ThreadsX.sum(enumerate(-n+fpart : n)) do (i, filtervalue)
        index = ipart+1-n+i
        return ordinate_time[index] * wsinc(filtervalue,n)
    end
end
end

where I comment out the respective blocks of code to try each technique

The results for ThreadsX are:

ya = ThreadsX.sum(enumerate(-n+fpart : n)) do (i, filtervalue)
        index = ipart+1-n+i
        return ordinate_time[index] * wsinc(filtervalue,n)
    end

julia> Threads.nthreads()
12
julia> @btime mwe()
  50.776 s (307802549 allocations: 18.50 GiB)

julia> Threads.nthreads()
1
julia> @btime mwe()
  15.994 s (193000000 allocations: 8.43 GiB)

Use sum instead of ThreadsX.sum and get the following
julia> @btime mwe()
  3.425 s (82000000 allocations: 1.52 GiB)

Clearly the most performant code is without any parallelizing

Now try using the @threads macro with a for loop:

    Threads.@threads for i in eachindex(nrange)
        filtervalue = nrange[i]
        index = ipart+1-n+i
        ya += ordinate_time[index] * wsinc(filtervalue,n)
    end

julia> Threads.nthreads()
12
julia> @btime mwe()
  18.827 s (145337991 allocations: 7.51 GiB)

julia> Threads.nthreads()
1
julia> @btime mwe()
  4.828 s (90000000 allocations: 2.18 GiB)

Remove the Threads.threads before the for loop
julia> @btime mwe()
  716.158 ms (1000000 allocations: 320.43 MiB)

Clearly the for loop outperforms the sum function and adding parallelizing reduces performance for all cases.

I have an outer for loop that I may be able to use LoopVectorization on. I will try that next.

It’s almost always fastest to apply multithreading to the outer loop. Can you illustrate your outer loop in a MWE?

Here is a MWE.

# Minimum working example for threaded operation of a for loop
# parallel summation of the inner loop is too small for this to be effective
using ThreadsX, Base.Threads
using Random

function mwe()
@inline h(x,n) = (cos(π/2 * x/n)) ^2    # Hanning window where n is the number of data points in each direction
@inline wsinc(x,n) = sinc(x) * h(x,n)     # windowed sinc function

t = 1:1000000
ordinate_time = sinpi.(2*10*t) .+ cospi.(2*14.7*t)
ordinate_angle = zeros(Float64, length(ordinate_time))
n = 10
for i = 15:1000000-15  # so I don't deal with the end conditions 
    fpart, ipart = modf(13.78)
    ipart = Integer(ipart)

    ya = 0.0
    nrange = -n+fpart : n
    for i in eachindex(nrange)
        filtervalue = nrange[i]
        index = ipart+1-n+i
        ya += ordinate_time[index] * wsinc(filtervalue,n)
    end
    ordinate_angle[i] = ya
    #=
    ya = ThreadsX.sum(enumerate(-n+fpart : n)) do (i, filtervalue)
        index = ipart+1-n+i
        return ordinate_time[index] * wsinc(filtervalue,n)
    end=#
end
return ordinate_angle
end

Edit: I added

using LoopVectorization 

@turbo for i = 15:1000000-15  # so I don't deal with the end conditions 

...

and now get the error

┌ Error: Failed to revise C:\Users\jakez\.julia\dev\SignalProcessing\example\MEW parallel sum.jl
│   exception =
│    LoadError: BoundsError: attempt to access 0-element Vector{LoopVectorization.Operation} at index [1]
│    in expression starting at C:\Users\jakez\.julia\dev\SignalProcessing\example\MEW parallel sum.jl:14
└ @ Revise C:\Users\jakez\.julia\packages\Revise\3RMhb\src\packagedef.jl:716

I am not sure what is going on.

After reworking your MWE, here’s what I get:

using Base.Threads, Random, BenchmarkTools, Test

@inline h(x::Float64,n::Int) = (cos(π/2 * x/n))^2
@inline wsinc(x::Float64,n::Int) = sinc(x) * h(x,n)

@inline function calc_ya(n::Int, ipart::Int ,fpart::Float64, ordinate_time::Vector{Float64})
    ya = 0.0
    for (j, filtervalue) in enumerate(-n+fpart : n)
        index = ipart+1-n+j
        ya += ordinate_time[index] * wsinc(filtervalue,n)
    end
    return ya
end

function mwe()
    t = 1:1000000
    n = 10
    ordinate_time = sinpi.(2*10*t) .+ cospi.(2*14.7*t)
    ordinate_angle = zeros(Float64, length(ordinate_time))
    fpart, ipart = modf(13.78)
    ipart = Integer(ipart)
    tsteps = 15:length(t)-15
    for i ∈ tsteps
        ordinate_angle[i] = calc_ya(n,ipart,fpart,ordinate_time)
    end
    return ordinate_angle
end

function mwe_threads()
    t = 1:1000000
    n = 10
    ordinate_time = sinpi.(2*10*t) .+ cospi.(2*14.7*t)
    ordinate_angle = zeros(Float64, length(ordinate_time))
    fpart, ipart = modf(13.78)
    ipart = Integer(ipart)
    tsteps = 15:length(t)-15
    @threads for i ∈ tsteps
        ordinate_angle[i] = calc_ya(n,ipart,fpart,ordinate_time)
    end
    return ordinate_angle
end

With 8 threads I get the following performance:

julia> @btime mwe();
  588.646 ms (4 allocations: 15.26 MiB)

julia> @btime mwe_threads();
  108.995 ms (47 allocations: 15.26 MiB)

julia> @test mwe() == mwe_threads()
Test Passed

I hope this helps you and I have understood your MWE correctly. Note that all variables that do not depend on i can be calculated outside the for-loop.

1 Like

Thank you kfrb for this reworked example. I was able to take your modification to the MWE and incorporate it into the main code.

The reason for some of the variables that were within the for loop were there because in the main code they did depend upon i, but thanks for the tip.

I think the main take-away from your rework is to put the inner for loop into its own function.