# Multithreading on entries sparse matrix

In my work I usually have to perform some operations on sparse matrices that involve mostly checking which entries are still important and remove those that are not. I thought that I could use multithreading for that, but my attempts didnβt produce a noticeable improvement. Therefore I am looking for a way to improve the performance of this use-case.

To give a simple example, imagine you have a sparse N x N matrix C, two vectors a and b of length N, and I want to make a new matrix with the entries of C that fulfill C[i,j] β₯ a[i] + b[j]. My serial implementation looks like this:

function keep_entries(C, a, b)
n, m = size(C)
I = Int[]
J = Int[]
V = Float64[]
rows = rowvals(C)
Cval = nonzeros(C)
for j in 1:m
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
push!(I, i)
push!(J, j)
push!(V, Cval[r])
end
end
end
K = sparse(I, J, V, n, m)
return K
end

(I am using this answer to iterate over the columns of C).

Then, my approach using Threads.@threads consists in the following: divide the columns of C into as many chunks as available threads, then send each to work on one portion of C, and concatenate the results. The code looks like this:

m, n = size(C)

Is = [Int[] for i in 1:Nchunks]
Js = [Int[] for i in 1:Nchunks]
Vs = [Float64[] for i in 1:Nchunks]

chunk_size = nΓ·Nchunks
j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk
rows = rowvals(C)
Cval = nonzeros(C)

for j in j_chunks[k]
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
push!(Is[k], i)
push!(Js[k], j)
push!(Vs[k], Cval[r])
end
end
end
end
I = vcat(Is...)
J = vcat(Js...)
V = vcat(Vs...)
return sparse(I, J, V, m, n)
end

Starting julia with 4 threads I get the following results:

julia> N = 100000;
julia> C = sprand(N, N, 20/N); # On average 20 non-zero entries per column
julia> a = 0.5.*rand(N); b = 0.5.*rand(N); # On average 50% of entries satisfy `C[i,j] β₯ a[i] + b[j]`, 50% not
julia> @benchmark keep_entries(\$C, \$a, \$b)
BenchmarkTools.Trial: 54 samples with 1 evaluation.
Range (min β¦ max):  86.917 ms β¦ 107.341 ms  β GC (min β¦ max): 0.00% β¦ 0.00%
Time  (median):     91.397 ms               β GC (median):    0.00%
Time  (mean Β± Ο):   92.826 ms Β±   4.475 ms  β GC (mean Β± Ο):  0.37% Β± 0.97%

ββ ββ    β                      β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
86.9 ms         Histogram: frequency by time          104 ms <

Memory estimate: 59.77 MiB, allocs estimate: 77.

BenchmarkTools.Trial: 64 samples with 1 evaluation.
Range (min β¦ max):  68.283 ms β¦ 91.087 ms  β GC (min β¦ max): 0.00% β¦ 0.00%
Time  (median):     78.294 ms              β GC (median):    0.00%
Time  (mean Β± Ο):   78.989 ms Β±  4.517 ms  β GC (mean Β± Ο):  0.16% Β± 0.32%

βββ β β   ββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
68.3 ms         Histogram: frequency by time        89.2 ms <

Memory estimate: 91.64 MiB, allocs estimate: 270.

So the improvement is pathetic. Raising the number of threads has a marginal effect on performance. Besides, the CPU activity looks like this:

which shows that the multi-threaded code is not taking advantage of all the coresβ power.

Some things I have tried and that didnβt improve the performance:

1. Checking on vcat running time. For these vectors it runs in ~ 4 ms, so doesnβt seem to be the bottleneck.
2. Checking on vcat type instability. I have tried annotating the output of vcat with the correct type, and replacing vcat with a self-made function (type stable and approximately the same running time), but the results are basically identical.

Any ideas/help making this run like a proper parallel algorithm are more than welcome!

Profiling shows that most of the work is done in return sparse(I, J, V, m, n). So you are basically distributing the smaller part of the workload (est. 40% vs. 60% in the call to sparse).

Edit: just for the sake of completeness: the assembled MWE

using BenchmarkTools, Profile
using SparseArrays

function keep_entries(C, a, b)
n, m = size(C)
I = Int[]
J = Int[]
V = Float64[]
rows = rowvals(C)
Cval = nonzeros(C)
for j in 1:m
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
push!(I, i)
push!(J, j)
push!(V, Cval[r])
end
end
end
K = sparse(I, J, V, n, m)
return K
end

m, n = size(C)

Is = [Int[] for i in 1:Nchunks]
Js = [Int[] for i in 1:Nchunks]
Vs = [Float64[] for i in 1:Nchunks]

chunk_size = nΓ·Nchunks
j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk
rows = rowvals(C)
Cval = nonzeros(C)

for j in j_chunks[k]
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
push!(Is[k], i)
push!(Js[k], j)
push!(Vs[k], Cval[r])
end
end
end
end
I = vcat(Is...)
J = vcat(Js...)
V = vcat(Vs...)
return sparse(I, J, V, m, n)
end

function test_serial(C, a, b)
ans = keep_entries(C[], a[], b[])
end

function test_parallel(C, a, b)
end

N = 100000;
C = sprand(N, N, 20/N); # On average 20 non-zero entries per column
a = 0.5.*rand(N); b = 0.5.*rand(N); # On average 50% of entries satisfy `C[i,j] β₯ a[i] + b[j]`, 50% not

#= ans_serial = nothing
@btime (\$ans_serial = test_serial(Ref(C), Ref(a), Ref(b)))
ans_parallel = nothing
@btime (\$ans_parallel = test_parallel(Ref(C), Ref(a), Ref(b)))
@assert ans_serial == ans_parallel =#

Profile.clear()
ans_serial = test_serial(Ref(C), Ref(a), Ref(b))
@profile (ans_serial = test_serial(Ref(C), Ref(a), Ref(b)))
# Profile.print()
Juno.profiler()

Profile.clear()
ans_parallel = test_parallel(Ref(C), Ref(a), Ref(b))
@profile (ans_parallel = test_parallel(Ref(C), Ref(a), Ref(b)))
# Profile.print()
Juno.profiler()
Oh, thanks a lot, I hadnβt noticed this! I guess that I might have to use directly the constructor of SparseMatrixCSC to avoid the overhead in translating from I, J, V vectors to the actual matrix. I will write here again if I manage to do so.

The βin-placeβ version should be something like:

function keep_entries2(C, a, b)
n, m = size(C)
colptr = Int[1]
sizehint!(colptr, length(C.colptr))
rowval = Int[]
nzval = Float64[]
rows = rowvals(C)
Cval = nonzeros(C)
cnt = 1
for j in 1:m
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
cnt += 1
push!(rowval, i)
push!(nzval, Cval[r])
end
end
push!(colptr, cnt)
end
return SparseMatrixCSC(m, n,colptr, rowval, nzval)
end
Hi Kristoffer,

I tried the exercise left for the reader;)

using BenchmarkTools, Profile
using SparseArrays

function keep_entries1(C, a, b)
n, m = size(C)
rows = rowvals(C)
Cval = nonzeros(C)

I = Int[]
J = Int[]
V = Float64[]
for j in 1:m
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
push!(I, i)
push!(J, j)
push!(V, Cval[r])
end
end
end
K = sparse(I, J, V, n, m)
return K
end

function keep_entries2(C, a, b)
n, m = size(C)
rows = rowvals(C)
Cval = nonzeros(C)

colptr = Int[1]
sizehint!(colptr, length(C.colptr))
rowval = Int[]
nzval = Float64[]
cnt = 1
for j in 1:m
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
cnt += 1
push!(rowval, i)
push!(nzval, Cval[r])
end
end
push!(colptr, cnt)
end
return SparseMatrixCSC(m, n,colptr, rowval, nzval)
end

m, n = size(C)
rows = rowvals(C)
Cval = nonzeros(C)

chunk_size = nΓ·Nchunks
j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk

Is = [Int[] for i in 1:Nchunks]
Js = [Int[] for i in 1:Nchunks]
Vs = [Float64[] for i in 1:Nchunks]

for j in j_chunks[k]
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
push!(Is[k], i)
push!(Js[k], j)
push!(Vs[k], Cval[r])
end
end
end
end
I = vcat(Is...)
J = vcat(Js...)
V = vcat(Vs...)
return sparse(I, J, V, m, n)
end

m, n = size(C)
rows = rowvals(C)
Cval = nonzeros(C)

chunk_size = nΓ·Nchunks
j_chunks = [(k-1)*chunk_size+1:k*chunk_size for k in 1:Nchunks]
j_chunks[end] = (Nchunks-1)*chunk_size+1:n # Adjust last chunk

_colptrs = [Int[1] for i in 1:Nchunks]
for i in 1:Nchunks
sizehint!(_colptrs[i], length(C.colptr)Γ·Nchunks)
end
_rowvals = [Int[] for i in 1:Nchunks]
_nzvals = [Float64[] for i in 1:Nchunks]

cnt = 1
for j in j_chunks[k]
for r in nzrange(C, j)
i = rows[r]
if Cval[r] β₯ a[i] + b[j]
cnt += 1
push!(_rowvals[k], i)
push!(_nzvals[k], Cval[r])
end
end
push!(_colptrs[k], cnt)
end
end
for k in 2:Nchunks
_colptrs[k] .+= _colptrs[k-1][end]
end
colptr = vcat(_colptrs...)
rowval = vcat(_rowvals...)
nzval = vcat(_nzvals...)
return SparseMatrixCSC(m, n,colptr, rowval, nzval)
end

function test_serial1(C, a, b)
ans = keep_entries1(C[], a[], b[])
end

function test_serial2(C, a, b)
ans = keep_entries2(C[], a[], b[])
end

function test_parallel1(C, a, b)
end

function test_parallel2(C, a, b)
end

N = 1000000;
C = sprand(N, N, 20/N); # On average 20 non-zero entries per column
a = 0.5.*rand(N); b = 0.5.*rand(N); # On average 50% of entries satisfy `C[i,j] β₯ a[i] + b[j]`, 50% not

ans_serial1 = nothing
@btime (\$ans_serial1 = test_serial1(Ref(C), Ref(a), Ref(b)))
ans_serial2 = nothing
@btime (\$ans_serial2 = test_serial2(Ref(C), Ref(a), Ref(b)))
@assert ans_serial2 == ans_serial1

ans_parallel1 = nothing
@btime (\$ans_parallel1 = test_parallel1(Ref(C), Ref(a), Ref(b)))
@assert ans_serial1 == ans_parallel1

ans_parallel2 = nothing
@btime (\$ans_parallel2 = test_parallel2(Ref(C), Ref(a), Ref(b)))
@assert ans_serial2 == ans_parallel2

This shows some speedup for the threaded version on my box:

2.713 s (93 allocations: 715.01 MiB)
595.080 ms (54 allocations: 265.63 MiB)
2.289 s (443 allocations: 862.86 MiB)
150.528 ms (317 allocations: 379.46 MiB)
Great, thanks a lot! I was working out a very similar implementation and was also getting around a 2.5x speedup wrto the serial version using 4 threads (and a lot more wrto my original one :)) so I think that this is more than enough for now. I am very happy to see this solved Thanks again also to @kristoffer.carlsson!