Multithreading on entries sparse matrix

In my work I usually have to perform some operations on sparse matrices that involve mostly checking which entries are still important and remove those that are not. I thought that I could use multithreading for that, but my attempts didn’t produce a noticeable improvement. Therefore I am looking for a way to improve the performance of this use-case.

To give a simple example, imagine you have a sparse N x N matrix C, two vectors a and b of length N, and I want to make a new matrix with the entries of C that fulfill C[i,j] β‰₯ a[i] + b[j]. My serial implementation looks like this:

function keep_entries(C, a, b)
    n, m = size(C)
    I = Int[]
    J = Int[]
    V = Float64[]
    rows = rowvals(C) 
    Cval = nonzeros(C)
    for j in 1:m
        for r in nzrange(C, j)
            i = rows[r]
            if Cval[r] β‰₯ a[i] + b[j]
                push!(I, i)
                push!(J, j)
                push!(V, Cval[r])
            end
        end
    end
    K = sparse(I, J, V, n, m)
    return K
end

(I am using this answer to iterate over the columns of C).

Then, my approach using Threads.@threads consists in the following: divide the columns of C into as many chunks as available threads, then send each to work on one portion of C, and concatenate the results. The code looks like this:

function keep_entries_threaded(C, a, b)
    m, n = size(C)
    
    Nchunks = Threads.nthreads()
    Is = [Int[] for i in 1:Nchunks]
    Js = [Int[] for i in 1:Nchunks]
    Vs = [Float64[] for i in 1:Nchunks]
    
    chunk_size = nΓ·Nchunks
    j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
    j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk
    rows = rowvals(C) 
    Cval = nonzeros(C)

    Threads.@threads for k in 1:Nchunks
        for j in j_chunks[k]
            for r in nzrange(C, j)
                i = rows[r]
                if Cval[r] β‰₯ a[i] + b[j]
                    push!(Is[k], i)
                    push!(Js[k], j)
                    push!(Vs[k], Cval[r])
                end
            end
        end
    end
    I = vcat(Is...)
    J = vcat(Js...)
    V = vcat(Vs...)
    return sparse(I, J, V, m, n)
end

Starting julia with 4 threads I get the following results:

julia> N = 100000;
julia> C = sprand(N, N, 20/N); # On average 20 non-zero entries per column
julia> a = 0.5.*rand(N); b = 0.5.*rand(N); # On average 50% of entries satisfy `C[i,j] β‰₯ a[i] + b[j]`, 50% not
julia> @benchmark keep_entries($C, $a, $b)
BenchmarkTools.Trial: 54 samples with 1 evaluation.
 Range (min … max):  86.917 ms … 107.341 ms  β”Š GC (min … max): 0.00% … 0.00%
 Time  (median):     91.397 ms               β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   92.826 ms Β±   4.475 ms  β”Š GC (mean Β± Οƒ):  0.37% Β± 0.97%

           β–‚β–ˆ β–…β–…    β–‚                      β–‚                    
  β–…β–…β–β–…β–ˆβ–…β–…β–…β–ˆβ–ˆβ–ˆβ–β–ˆβ–ˆβ–…β–β–β–ˆβ–ˆβ–ˆβ–β–ˆβ–ˆβ–…β–β–…β–β–β–β–ˆβ–β–β–β–β–ˆβ–…β–β–β–…β–β–β–ˆβ–β–β–β–…β–β–β–β–β–β–β–β–ˆβ–β–β–β–β–β–… ▁
  86.9 ms         Histogram: frequency by time          104 ms <

 Memory estimate: 59.77 MiB, allocs estimate: 77.

julia> @benchmark keep_entries_threaded($C, $a, $b)
BenchmarkTools.Trial: 64 samples with 1 evaluation.
 Range (min … max):  68.283 ms … 91.087 ms  β”Š GC (min … max): 0.00% … 0.00%
 Time  (median):     78.294 ms              β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   78.989 ms Β±  4.517 ms  β”Š GC (mean Β± Οƒ):  0.16% Β± 0.32%

                       β–ˆβ–‚β–‚ β–‚ β–‚   β–…β–‚                            
  β–…β–β–β–β–…β–β–β–β–…β–β–β–β–β–β–ˆβ–…β–…β–…β–β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–…β–ˆβ–ˆβ–ˆβ–ˆβ–…β–ˆβ–ˆβ–ˆβ–…β–…β–β–…β–…β–ˆβ–β–…β–…β–…β–…β–…β–…β–…β–β–β–ˆβ–β–…β–…β–β–β–β–…β–β–… ▁
  68.3 ms         Histogram: frequency by time        89.2 ms <

 Memory estimate: 91.64 MiB, allocs estimate: 270.

So the improvement is pathetic. Raising the number of threads has a marginal effect on performance. Besides, the CPU activity looks like this:


which shows that the multi-threaded code is not taking advantage of all the cores’ power.

Some things I have tried and that didn’t improve the performance:

  1. Checking on vcat running time. For these vectors it runs in ~ 4 ms, so doesn’t seem to be the bottleneck.
  2. Checking on vcat type instability. I have tried annotating the output of vcat with the correct type, and replacing vcat with a self-made function (type stable and approximately the same running time), but the results are basically identical.

Any ideas/help making this run like a proper parallel algorithm are more than welcome! :slight_smile:

Profiling shows that most of the work is done in return sparse(I, J, V, m, n). So you are basically distributing the smaller part of the workload (est. 40% vs. 60% in the call to sparse).

Edit: just for the sake of completeness: the assembled MWE

using BenchmarkTools, Profile
using SparseArrays

function keep_entries(C, a, b)
    n, m = size(C)
    I = Int[]
    J = Int[]
    V = Float64[]
    rows = rowvals(C)
    Cval = nonzeros(C)
    for j in 1:m
        for r in nzrange(C, j)
            i = rows[r]
            if Cval[r] β‰₯ a[i] + b[j]
                push!(I, i)
                push!(J, j)
                push!(V, Cval[r])
            end
        end
    end
    K = sparse(I, J, V, n, m)
    return K
end

function keep_entries_threaded(C, a, b)
    m, n = size(C)

    Nchunks = Threads.nthreads()
    Is = [Int[] for i in 1:Nchunks]
    Js = [Int[] for i in 1:Nchunks]
    Vs = [Float64[] for i in 1:Nchunks]

    chunk_size = nΓ·Nchunks
    j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
    j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk
    rows = rowvals(C)
    Cval = nonzeros(C)

    Threads.@threads for k in 1:Nchunks
        for j in j_chunks[k]
            for r in nzrange(C, j)
                i = rows[r]
                if Cval[r] β‰₯ a[i] + b[j]
                    push!(Is[k], i)
                    push!(Js[k], j)
                    push!(Vs[k], Cval[r])
                end
            end
        end
    end
    I = vcat(Is...)
    J = vcat(Js...)
    V = vcat(Vs...)
    return sparse(I, J, V, m, n)
end

function test_serial(C, a, b)
    ans = keep_entries(C[], a[], b[])
end

function test_parallel(C, a, b)
    ans = keep_entries_threaded(C[], a[], b[])
end

N = 100000;
C = sprand(N, N, 20/N); # On average 20 non-zero entries per column
a = 0.5.*rand(N); b = 0.5.*rand(N); # On average 50% of entries satisfy `C[i,j] β‰₯ a[i] + b[j]`, 50% not

#= ans_serial = nothing
@btime ($ans_serial = test_serial(Ref(C), Ref(a), Ref(b)))
ans_parallel = nothing
@btime ($ans_parallel = test_parallel(Ref(C), Ref(a), Ref(b)))
@assert ans_serial == ans_parallel =#

Profile.clear()
ans_serial = test_serial(Ref(C), Ref(a), Ref(b))
@profile (ans_serial = test_serial(Ref(C), Ref(a), Ref(b)))
# Profile.print()
Juno.profiler()

Profile.clear()
ans_parallel = test_parallel(Ref(C), Ref(a), Ref(b))
@profile (ans_parallel = test_parallel(Ref(C), Ref(a), Ref(b)))
# Profile.print()
Juno.profiler()
1 Like

Oh, thanks a lot, I hadn’t noticed this! I guess that I might have to use directly the constructor of SparseMatrixCSC to avoid the overhead in translating from I, J, V vectors to the actual matrix. I will write here again if I manage to do so.

The β€œin-place” version should be something like:

function keep_entries2(C, a, b)
    n, m = size(C)
    colptr = Int[1]
    sizehint!(colptr, length(C.colptr))
    rowval = Int[]
    nzval = Float64[]
    rows = rowvals(C) 
    Cval = nonzeros(C)
    cnt = 1
    for j in 1:m
        for r in nzrange(C, j)
            i = rows[r]
            if Cval[r] β‰₯ a[i] + b[j]
                cnt += 1
                push!(rowval, i)
                push!(nzval, Cval[r])
            end
        end
        push!(colptr, cnt)
    end
    return SparseMatrixCSC(m, n,colptr, rowval, nzval)
end
3 Likes

Hi Kristoffer,

I tried the exercise left for the reader;)

using BenchmarkTools, Profile
using SparseArrays

function keep_entries1(C, a, b)
    n, m = size(C)
    rows = rowvals(C)
    Cval = nonzeros(C)

    I = Int[]
    J = Int[]
    V = Float64[]
    for j in 1:m
        for r in nzrange(C, j)
            i = rows[r]
            if Cval[r] β‰₯ a[i] + b[j]
                push!(I, i)
                push!(J, j)
                push!(V, Cval[r])
            end
        end
    end
    K = sparse(I, J, V, n, m)
    return K
end

function keep_entries2(C, a, b)
    n, m = size(C)
    rows = rowvals(C)
    Cval = nonzeros(C)

    colptr = Int[1]
    sizehint!(colptr, length(C.colptr))
    rowval = Int[]
    nzval = Float64[]
    cnt = 1
    for j in 1:m
        for r in nzrange(C, j)
            i = rows[r]
            if Cval[r] β‰₯ a[i] + b[j]
                cnt += 1
                push!(rowval, i)
                push!(nzval, Cval[r])
            end
        end
        push!(colptr, cnt)
    end
    return SparseMatrixCSC(m, n,colptr, rowval, nzval)
end

function keep_entries1_threaded(C, a, b)
    m, n = size(C)
    rows = rowvals(C)
    Cval = nonzeros(C)

    Nchunks = Threads.nthreads()
    chunk_size = nΓ·Nchunks
    j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
    j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk

    Is = [Int[] for i in 1:Nchunks]
    Js = [Int[] for i in 1:Nchunks]
    Vs = [Float64[] for i in 1:Nchunks]

    Threads.@threads for k in 1:Nchunks
        for j in j_chunks[k]
            for r in nzrange(C, j)
                i = rows[r]
                if Cval[r] β‰₯ a[i] + b[j]
                    push!(Is[k], i)
                    push!(Js[k], j)
                    push!(Vs[k], Cval[r])
                end
            end
        end
    end
    I = vcat(Is...)
    J = vcat(Js...)
    V = vcat(Vs...)
    return sparse(I, J, V, m, n)
end

function keep_entries2_threaded(C, a, b)
    m, n = size(C)
    rows = rowvals(C)
    Cval = nonzeros(C)

    Nchunks = Threads.nthreads()
    chunk_size = nΓ·Nchunks
    j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
    j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk

    _colptrs = [Int[1] for i in 1:Nchunks]
    for i in 1:Nchunks
        sizehint!(_colptrs[i], length(C.colptr)Γ·Nchunks)
    end
    _rowvals = [Int[] for i in 1:Nchunks]
    _nzvals = [Float64[] for i in 1:Nchunks]

    Threads.@threads for k in 1:Nchunks
        cnt = 1
        for j in j_chunks[k]
            for r in nzrange(C, j)
                i = rows[r]
                if Cval[r] β‰₯ a[i] + b[j]
                    cnt += 1
                    push!(_rowvals[k], i)
                    push!(_nzvals[k], Cval[r])
                end
            end
            push!(_colptrs[k], cnt)
        end
    end
    for k in 2:Nchunks
        _colptrs[k] .+= _colptrs[k-1][end]
    end
    colptr = vcat(_colptrs...)
    rowval = vcat(_rowvals...)
    nzval = vcat(_nzvals...)
    return SparseMatrixCSC(m, n,colptr, rowval, nzval)
end

function test_serial1(C, a, b)
    ans = keep_entries1(C[], a[], b[])
end

function test_serial2(C, a, b)
    ans = keep_entries2(C[], a[], b[])
end

function test_parallel1(C, a, b)
    ans = keep_entries1_threaded(C[], a[], b[])
end

function test_parallel2(C, a, b)
    ans = keep_entries2_threaded(C[], a[], b[])
end

N = 1000000;
C = sprand(N, N, 20/N); # On average 20 non-zero entries per column
a = 0.5.*rand(N); b = 0.5.*rand(N); # On average 50% of entries satisfy `C[i,j] β‰₯ a[i] + b[j]`, 50% not

ans_serial1 = nothing
@btime ($ans_serial1 = test_serial1(Ref(C), Ref(a), Ref(b)))
ans_serial2 = nothing
@btime ($ans_serial2 = test_serial2(Ref(C), Ref(a), Ref(b)))
@assert ans_serial2 == ans_serial1

ans_parallel1 = nothing
@btime ($ans_parallel1 = test_parallel1(Ref(C), Ref(a), Ref(b)))
@assert ans_serial1 == ans_parallel1

ans_parallel2 = nothing
@btime ($ans_parallel2 = test_parallel2(Ref(C), Ref(a), Ref(b)))
@assert ans_serial2 == ans_parallel2

This shows some speedup for the threaded version on my box:

  2.713 s (93 allocations: 715.01 MiB)
  595.080 ms (54 allocations: 265.63 MiB)
  2.289 s (443 allocations: 862.86 MiB)
  150.528 ms (317 allocations: 379.46 MiB)
1 Like

Great, thanks a lot! I was working out a very similar implementation and was also getting around a 2.5x speedup wrto the serial version using 4 threads (and a lot more wrto my original one :)) so I think that this is more than enough for now. I am very happy to see this solved :slight_smile: Thanks again also to @kristoffer.carlsson!