Multithreading workflow - alternative for :static scheduling

As stated in the docs (Documentation for Threads.@threads):

:static scheduling exists for supporting transition of code written before Julia 1.3.
In newly written library functions, :static scheduling is discouraged because the
functions using this option cannot be called from arbitrary worker threads.

To avoid racing conditions, I usually divide the work manually between workers and then each thread writes to an own column of a result matrix. With this workflow, :static scheduling is needed to obtain correct results.

Consider the following example:

using Base.Threads

function scheduling_comparison()
    n = 100
    x = 0.01 .* collect(1:n)
    indices = rand(1:n, n*1000)
    owned_indices = defaultdist(length(indices), nthreads())
    result_serial = serial_solution(x, indices, n)
    result_static = static_scheduling(x, indices, owned_indices, n)
    result_dynamic = dynamic_scheduling(x, indices, owned_indices, n)
    @show result_serial ≈ result_static
    @show result_serial ≈ result_dynamic
    return nothing
end

some_big_calculation(x) = sin(x) - exp(x) * x^2

function serial_solution(x, indices, n)
    result = zeros(n)
    for index in indices
        result[index] += some_big_calculation(x[index])
    end
    return result
end

function static_scheduling(x, indices, owned_indices, n)
    result_matrix = zeros(n, nthreads())
    @threads :static for _ in 1:nthreads()
        for index_pos in owned_indices[threadid()]
            index = indices[index_pos]
            result_matrix[index, threadid()] += some_big_calculation(x[index])
        end
    end
    result = zeros(n)
    @threads for i in 1:n
        result[i] = sum(@view result_matrix[i, :])
    end
    return result
end

function dynamic_scheduling(x, indices, owned_indices, n)
    result_matrix = zeros(n, nthreads())
    @threads :dynamic for _ in 1:nthreads()
        for index_pos in owned_indices[threadid()]
            index = indices[index_pos]
            result_matrix[index, threadid()] += some_big_calculation(x[index])
        end
    end
    result = zeros(n)
    @threads for i in 1:n
        result[i] = sum(@view result_matrix[i, :])
    end
    return result
end

function defaultdist(sz::Int, nc::Int)
    if sz >= nc
        chunk_size = div(sz,nc)
        remainder = rem(sz,nc)
        sidx = zeros(Int64, nc+1)
        for i = 1:(nc+1)
            sidx[i] += (i-1)*chunk_size + 1
            if i<= remainder
                sidx[i] += i-1
            else
                sidx[i] += remainder
            end
        end
        grid = fill(0:0,nc)
        for i = 1:nc
            grid[i] = sidx[i]:sidx[i+1]-1
        end
        return grid
    else
        sidx = [1:(sz+1);]
        grid = fill(0:0,nc)
        for i = 1:sz
            grid[i] = sidx[i]:sidx[i+1]-1
        end
        return grid
    end
end

As you can see, for :dynamic scheduling sometimes wrong results are determined:

julia> scheduling_comparison()
result_serial ≈ result_static = true
result_serial ≈ result_dynamic = false

julia> scheduling_comparison()
result_serial ≈ result_static = true
result_serial ≈ result_dynamic = true

The problem with :dynamic scheduling is that the value of threadid() is not guranteed to be constant within one iteration and thus certain indices are not used for the calculation.

Since I am in the process of preparing my first Julia package:
What is the official way if it is important that threadid() has a constant value per iteration?

Any suggestions, comments or help with my workflow would be greatly appreciated!

1 Like
@threads :dynamic for tid in 1:nthreads()
        for index_pos in owned_indices[tid]
            index = indices[index_pos]
            result_matrix[index, tid] += some_big_calculation(x[index])
        end
    end
4 Likes

Well, sometimes you do not see the wood for the trees…
Thank you for your answer, @Elrod!

In case somebody is interested, here the performance differences (8 threads, Julia 1.8.1, Intel i9 on macOS Monterey):

function scheduling_benchmarks()
    n = 100000
    x = 0.01 .* collect(1:n)
    indices = rand(1:n, n*1000)
    owned_indices = defaultdist(length(indices), nthreads())
    @btime result_serial = serial_solution($x, $indices, $n)
    @btime result_static = static_scheduling($x, $indices, $owned_indices, $n)
    @btime result_dynamic = dynamic_scheduling($x, $indices, $owned_indices, $n)
    return nothing
end
julia> scheduling_benchmarks()
  3.058 s (2 allocations: 781.30 KiB)
  532.266 ms (102 allocations: 6.87 MiB)
  540.981 ms (103 allocations: 6.87 MiB)
Complete code
using Base.Threads
using BenchmarkTools

function scheduling_comparison()
    n = 100
    x = 0.01 .* collect(1:n)
    indices = rand(1:n, n*1000)
    owned_indices = defaultdist(length(indices), nthreads())
    result_serial = serial_solution(x, indices, n)
    result_static = static_scheduling(x, indices, owned_indices, n)
    result_dynamic = dynamic_scheduling(x, indices, owned_indices, n)
    @show result_serial ≈ result_static
    @show result_serial ≈ result_dynamic
    return nothing
end

function scheduling_benchmarks()
    n = 100000
    x = 0.01 .* collect(1:n)
    indices = rand(1:n, n*1000)
    owned_indices = defaultdist(length(indices), nthreads())
    @btime result_serial = serial_solution($x, $indices, $n)
    @btime result_static = static_scheduling($x, $indices, $owned_indices, $n)
    @btime result_dynamic = dynamic_scheduling($x, $indices, $owned_indices, $n)
    return nothing
end

some_big_calculation(x) = sin(x) - exp(x) * x^2

function serial_solution(x, indices, n)
    result = zeros(n)
    for index in indices
        result[index] += some_big_calculation(x[index])
    end
    return result
end

function dynamic_scheduling(x, indices, owned_indices, n)
    result_matrix = zeros(n, nthreads())
    @threads :dynamic for tid in 1:nthreads()
        for index_pos in owned_indices[tid]
            index = indices[index_pos]
            result_matrix[index, tid] += some_big_calculation(x[index])
        end
    end
    result = zeros(n)
    @threads for i in 1:n
        result[i] = sum(@view result_matrix[i, :])
    end
    return result
end

function static_scheduling(x, indices, owned_indices, n)
    result_matrix = zeros(n, nthreads())
    @threads :static for tid in 1:nthreads()
        for index_pos in owned_indices[tid]
            index = indices[index_pos]
            result_matrix[index, tid] += some_big_calculation(x[index])
        end
    end
    result = zeros(n)
    @threads for i in 1:n
        result[i] = sum(@view result_matrix[i, :])
    end
    return result
end

function defaultdist(sz::Int, nc::Int)
    if sz >= nc
        chunk_size = div(sz,nc)
        remainder = rem(sz,nc)
        sidx = zeros(Int64, nc+1)
        for i = 1:(nc+1)
            sidx[i] += (i-1)*chunk_size + 1
            if i<= remainder
                sidx[i] += i-1
            else
                sidx[i] += remainder
            end
        end
        grid = fill(0:0,nc)
        for i = 1:nc
            grid[i] = sidx[i]:sidx[i+1]-1
        end
        return grid
    else
        sidx = [1:(sz+1);]
        grid = fill(0:0,nc)
        for i = 1:sz
            grid[i] = sidx[i]:sidx[i+1]-1
        end
        return grid
    end
end