Threading race condition when pushing to arrays

I’m trying to populate a sparse matrix in a multithreaded loop, along the lines of the below code:

function build_matrix(fun::Function, ::Type{T}, m, n) where T
    # Allocate thread-local arrays, which we will later concatenate.
    I = [Int[] for i in 1:Threads.nthreads()]
    J = [Int[] for i in 1:Threads.nthreads()]
    V = [T[] for i in 1:Threads.nthreads()]

    Threads.@threads for i = 1:m
        tid = Threads.threadid()
        for j = 1:n
            v = fun(i,j)
            iszero(v) && continue

            push!(I[tid], i)
            push!(J[tid], j)
            push!(V[tid], v)
        end
    end

    sparse(reduce(vcat, I),
           reduce(vcat, J),
           reduce(vcat, V), m, n)
end

This always works in serial calculations, and sometimes when running with many threads, but in the latter case, I very often get the error message

ERROR: LoadError: ArgumentError: the first three arguments' lengths must match,
length(I) (=332928) == length(J) (= 332927) == length(V) (= 332925)

where the numbers vary, but are typically close to one another.

What can be the cause of this? I cannot really see that the code should be thread-unsafe. I’m thinking that there may be some data reshuffling in memory as the β€œthread-local” arrays grow, but the vectors of vectors I thought would only hold pointers to the vectors actually storing the data.

I’m sorry that I’m unable to provide a reproducible MWE, I will see if I can cook something up. I’ve seen this behaviour on two different machines (one Intel(R) Xeon(R) CPU E5-2640 v4 @ 2.40GHz and one AMD Ryzen 9 3950X 16-Core Processor).

I have seen this since at least Julia 1.8, maybe already earlier, and it is still present on 1.9.

Does it still break if you use Threads.@threads :static for i = 1:m instead?

The likely issue here is that due to task migration (e.g. tasks can be executed by different threads), the assumption that the tid is constant is wrong.

One way of solving this is to use a safe data-structure like a Channel.

    Is = Channel{Vector{Int}}(m)
    Js = Channel{Vector{Int}}(m)
    Vs = Channel{Vector{T}}(m)
    Threads.@threads for i = 1:m
        I = Int[]
        J = Int[]
        V = T[]
        for j = 1:n
            v = fun(i,j)
            iszero(v) && continue

            push!(I, i)
            push!(J, j)
            push!(V, v)
        end
        put!(Is, I)
        put!(Js, J)
        put!(Vs, V)
    end
    close(Is); close(Js); close(Vs)
    sparse(reduce(vcat, Is),
           reduce(vcat, Js),
           reduce(vcat, Vs), m, n)

Note: The close operation, otherwise iterating over the elements would hang your program.

3 Likes

how much slower is this?

using FLoops
using BangBang # for `append!!`
using MicroCollections  # for `EmptyVector` and `SingletonVector`
using SparseArrays

function build_matrix(fun::Function, ::Type{T}, m, n) where T
    # Allocate thread-local arrays, which we will later concatenate.
    I = [Int[] for i in 1:Threads.nthreads()]
    J = [Int[] for i in 1:Threads.nthreads()]
    V = [T[] for i in 1:Threads.nthreads()]

    Threads.@threads for i = 1:m
        tid = Threads.threadid()
        for j = 1:n
            v = fun(i,j)
            iszero(v) && continue

            push!(I[tid], i)
            push!(J[tid], j)
            push!(V[tid], v)
        end
    end

    @show I
    sparse(reduce(vcat, I),
           reduce(vcat, J),
           reduce(vcat, V), m, n)
end


function build_matrix_floop(fun::Function, ::Type{T}, m, n) where T
    # Allocate thread-local arrays, which we will later concatenate.
    @floop for i = 1:m
        I = Int[]
        J = Int[]
        V = T[]
        for j = 1:n
            v = fun(i,j)
            iszero(v) && continue

            push!(I, i)
            push!(J, j)
            push!(V, v)
        end
        @reduce(Is = append!!(EmptyVector(), I))
        @reduce(Js = append!!(EmptyVector(), J))
        @reduce(Vs = append!!(EmptyVector(), V))
    end
    sparse(Is, Js, Vs, m, n)
end

function build_matrix_channels(fun::Function, ::Type{T}, m, n) where T
    Is = Channel{Vector{Int}}(m)
    Js = Channel{Vector{Int}}(m)
    Vs = Channel{Vector{T}}(m)
    Threads.@threads for i = 1:m
        I = Int[]
        J = Int[]
        V = T[]
        for j = 1:n
            v = fun(i,j)
            iszero(v) && continue

            push!(I, i)
            push!(J, j)
            push!(V, v)
        end
        put!(Is, I)
        put!(Js, J)
        put!(Vs, V)
    end
    close(Is); close(Js); close(Vs)
    sparse(reduce(vcat, Is),
           reduce(vcat, Js),
           reduce(vcat, Vs), m, n)
end

Without threads:

julia> @benchmark build_matrix((i,j)->i*j, Int, 10, 10)
BenchmarkTools.Trial: 10000 samples with 6 evaluations.
 Range (min … max):  5.053 ΞΌs … 357.837 ΞΌs  β”Š GC (min … max): 0.00% … 95.01%
 Time  (median):     5.547 ΞΌs               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   6.406 ΞΌs Β±  13.773 ΞΌs  β”Š GC (mean Β± Οƒ):  8.91% Β±  4.07%

   β–‚β–„β–‡β–ˆβ–ˆβ–ˆβ–‡β–†β–…β–„β–‚             ▁▁▁▂▁▂▂▂▂▂▁ ▁                      β–‚
  β–†β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–‡β–ˆβ–‡β–‡β–‡β–‡β–ˆβ–ˆβ–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–‡β–‡β–‡β–‡β–‡β–‡β–…β–…β–…β–†β–…β–„β–ƒβ–ƒβ–ƒβ–…β–„β–…β–ƒ β–ˆ
  5.05 ΞΌs      Histogram: log(frequency) by time      9.69 ΞΌs <

 Memory estimate: 13.36 KiB, allocs estimate: 38.
julia> @benchmark build_matrix_floop((i,j)->i*j, Int, 10, 10)
BenchmarkTools.Trial: 10000 samples with 6 evaluations.
 Range (min … max):  6.175 ΞΌs … 345.855 ΞΌs  β”Š GC (min … max):  0.00% … 96.24%
 Time  (median):     6.743 ΞΌs               β”Š GC (median):     0.00%
 Time  (mean Β± Οƒ):   7.963 ΞΌs Β±  18.642 ΞΌs  β”Š GC (mean Β± Οƒ):  13.00% Β±  5.43%

   β–‚β–„β–†β–ˆβ–ˆβ–ˆβ–ˆβ–‡β–†β–…β–ƒβ–ƒβ–‚β– ▁                                           β–‚
  β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–‡β–†β–†β–‡β–†β–†β–„β–†β–„β–ƒβ–„β–„β–ƒβ–„β–…β–…β–ƒβ–…β–ƒβ–„β–„β–…β–…β–…β–†β–‡β–†β–‡β–‡β–‡β–‡β–†β–†β–…β–…β–†β–‡β–‡β–‡ β–ˆ
  6.18 ΞΌs      Histogram: log(frequency) by time      11.3 ΞΌs <

 Memory estimate: 24.22 KiB, allocs estimate: 120.
julia> @benchmark build_matrix_channels((i,j)->i*j, Int, 10, 10)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  10.680 ΞΌs …  2.335 ms  β”Š GC (min … max):  0.00% … 98.02%
 Time  (median):     12.430 ΞΌs              β”Š GC (median):     0.00%
 Time  (mean Β± Οƒ):   14.909 ΞΌs Β± 65.073 ΞΌs  β”Š GC (mean Β± Οƒ):  12.14% Β±  2.76%

    β–ƒβ–…β–‡β–‡β–ˆβ–ˆβ–ˆβ–ˆβ–‡β–†β–†β–…β–„β–„β–ƒβ–β–β–     ▁ ▁▂▁▂▁▁▂▁▁▂▁▁▁                    β–ƒ
  β–ƒβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–‡β–†β–‡β–‡β–…β–†β–…β–„β–†β–…β–ƒβ–…β–…β–„ β–ˆ
  10.7 ΞΌs      Histogram: log(frequency) by time      22.4 ΞΌs <

 Memory estimate: 36.42 KiB, allocs estimate: 203.

Haven’t had time to look at why and of course would need an actually useful test.

3 Likes

Not that I have anything useful to add but welcome back!

1 Like

It seems the Channel-based approach works, so marking this as solved. Thank you!