Multithreaded MatVec


#1

Just out of curiosity, has anyone written up a sparse matrix-vector product using the new multithreading stuff? The At_mul_B operation works great because all the writes are disjoint between threads. But I was curious about the best strategy to implement the A_mul_B operation.

The code I’ve gotten at the moment is: (Which is very rough, just wanted something to start with!)

using Iterators
#simple_partition(n::Int, k::Int) = push!(map(first, partition(1:n,floor(Int,n/k))),n+1)
function simple_partition(n::Int,k::Int)
  rval = ones(Int,k+1)
  m,r = divrem(n,k)
  fill!(rval, m)
  rval[1] = 1
  for i = 2:k+1
    rval[i] += r .> (i-2)
  end
  cumsum!(rval,rval)
end

import Base.LinAlg.At_mul_B, Base.LinAlg.At_mul_B!, Base.A_mul_B, Base.LinAlg.A_mul_B!

"""
A type to encapsulate the multithreaded matrix-vector product operation.

Usage
-----
~~~~
A = sprandn(10^5,10^5,100/10^5); # create a fairly dense sparse matrix.
x = randn(10^5);
M = MultithreadedMatVec(A, simple_partition(A.n, Threads.nthreads()));
y = M.'*x;
norm(y - A.'*x)
~~~~
"""
mutable struct MultithreadedMatVec{T,I} <: AbstractArray{T,2}
  A::SparseMatrixCSC{T,I}
  regions::Vector{Int}

  MultithreadedMatVec(A::SparseMatrixCSC) = MultithreadedMatVec(A,Threads.nthreads())
  MultithreadedMatVec(A::SparseMatrixCSC, k::Int) = MultithreadedMatVec(A, simple_partition(A.n,k))
  function MultithreadedMatVec(A::SparseMatrixCSC{T,I}, regions::Vector{Int}) where {T,I}
    new{T,I}(A, regions)
  end
end

import Base.size
size(M::MultithreadedMatVec,inputs...) = size(M.A,inputs...)

# Julia's built in multiplication operations are called with
# A_mul_B!
# Ac_mul_B!
# At_mul_B!
# which take in
# α::Number, A::SparseMatrixCSC, B::StridedVecOrMat, β::Number, C::StridedVecOrMat
# and compute
# βC += αA B
# βC += αA^* B
# βC += αA^T B
# respectively
# look in base/sparse/linalg.jl
# for their implementations

""" Run the internal loop """
function internal_loop_transmult(C,B,nzv,rv,colptr,i1,i2,α::Number)
  for k=1:size(C,2)
    @inbounds for col=i1:i2
      tmp = zero(eltype(C))
      for j=colptr[col]:(colptr[col+1]-1)
        tmp += nzv[j]*B[rv[j],k]
      end
      C[col,k] += α*tmp
    end
  end
  return
end

"""
we are going to make these work with MuilthreadedMatVec types
"""
function At_mul_B!(α::Number, M::MultithreadedMatVec, B::StridedVecOrMat, β::Number, C::StridedVecOrMat)
  M.A.m == size(B,1) || throw(DimensionMismatch())
  M.A.n == size(C,1) || throw(DimensionMismatch())
  size(B,2) == size(C,2) || throw(DimensionMismatch())
  nzv = M.A.nzval
  rv = M.A.rowval
  colptr = M.A.colptr
  if β != 1
    β != 0 ? scale!(C, β) : fill!(C, zero(eltype(C)))
  end
  # this is the parallel construction
  Threads.@threads for t=1:(length(M.regions)-1)
    internal_loop_transmult(C,B,nzv,rv,colptr,M.regions[t],M.regions[t+1]-1,α)
  end
  C
end

function At_mul_B(M::MultithreadedMatVec{TA,S}, x::StridedVector{Tx}) where {TA,S,Tx}
  T = promote_type(TA,Tx)
  At_mul_B!(one(T), M, x, zero(T), similar(x, T, M.A.n))
end

function internal_loop_mult(C,B,nzv,rv,colptr,i1,i2,α::Number)
  for k=1:size(C,2)
    @inbounds for col=i1:i2
      αxj = α*B[col,k]
      for j=colptr[col]:(colptr[col+1]-1)
        # need to be done atomically... (ideally...)
        Threads.@atomic C[rv[j],k] += nzv[j]*αxj
      end
    end
  end
  return
end

function A_mul_B!(α::Number, M::MultithreadedMatVec, B::StridedVecOrMat, β::Number, C::StridedVecOrMat)
  M.A.n == size(B,1) || throw(DimensionMismatch())
  M.A.m == size(C,1) || throw(DimensionMismatch())
  size(B,2) == size(C,2) || throw(DimensionMismatch())
  nzv = M.A.nzval
  rv = M.A.rowval
  colptr = M.A.colptr
  if β != 1
    β != 0 ? scale!(C, β) : fill!(C, zero(eltype(C)))
  end
  # this is the parallel construction
  Threads.@threads for t=1:(length(M.regions)-1)
    internal_loop_mult(C,B,nzv,rv,colptr,M.regions[t],M.regions[t+1]-1,α)
  end
  C
end

import Base.eltype
eltype(M::MultithreadedMatVec{T,I}) where {T,I} = T

function A_mul_B!(C::StridedVecOrMat, M::MultithreadedMatVec, B::StridedVecOrMat)
  T = promote_type(eltype(M), eltype(B))
  A_mul_B!(one(T), M, B, zero(T), C)
end

The A_mul_B doesn’t work because the “Threads.@atomic” macro doesn’t exist. Removing this would run, but likely would give the wrong answer depending on your multithreading environment.

There’s the whole Threads.atomic* set of operations, which could be adapted for this task in various ways. I was just curious if anyone had actually worked this out in Julia and looked at any performance results. (I’m sure this has been done before many times in other languages :))


#2

Just to be clear, If you want performance, you should not use atomics for computation. Atomics are used for thread synchronization and if you simply replace += with atomic_add! you are guaranteed a slow down on the order of 100x.


#3

I appreciate that atomics have overhead. :slight_smile:

I guess my point is that we can atomics in OpenMP and get better performance.


#include <inttypes.h>

// compile with
// g++ ompmatvec.cc -o ompmatvec.so -shared -Wall -fopenmp -fPIC -O3 -march=native
// assembly with
// g++ ompmatvec.cc -o ompmatvec.s -shared -Wall -fopenmp -fPIC -O3 -S -march=native

extern "C" {
int64_t parallel_matvec(int64_t m, int64_t n, int64_t* colptr, int64_t* rv, double* nzv, double* b, double* c, int k)
{
  #pragma omp parallel shared(b,c) num_threads(k)
  {
    #pragma omp for
    for (int64_t i = 0; i < n; ++i) { // each column
      double bval = b[i];
      for (int64_t j = colptr[i]-1; j < colptr[i+1]-1; ++j) {
        #pragma omp atomic
        c[rv[j]-1] += nzv[j]*bval;
      }
    }
  }
  return 1;
}
} // extern C

and then

import Base.LinAlg.A_mul_B!

"""
A type to encapsulate the multithreaded matrix-vector product operation.
Via OMP

"""
mutable struct OMPMatVec{T,I} <: AbstractArray{T,2}
  A::SparseMatrixCSC{T,I}
  k::Int

  function OMPMatVec(A::SparseMatrixCSC{T,I}, k::Int) where {T,I}
    new{T,I}(A, k)
  end
end

import Base.size
size(M::OMPMatVec,inputs...) = size(M.A,inputs...)

function A_mul_B!(α::Number, M::OMPMatVec, B::StridedVecOrMat, β::Number, C::StridedVecOrMat)
  @assert α == 1
  @assert size(C,2) == 1
  M.A.n == size(B,1) || throw(DimensionMismatch())
  M.A.m == size(C,1) || throw(DimensionMismatch())
  size(B,2) == size(C,2) || throw(DimensionMismatch())
  nzv = M.A.nzval
  rv = M.A.rowval
  colptr = M.A.colptr
  ccall((:parallel_matvec, "./ompmatvec.so"), Int64,
    (Int64, Int64, Ptr{Int64}, Ptr{Int64}, Ptr{Float64}, Ptr{Float64}, Ptr{Float64}, Int64),
    M.A.m, M.A.n, colptr, rv, nzv, B, C, M.k)
  C
end

import Base.eltype
eltype(M::OMPMatVec{T,I}) where {T,I} = T

function A_mul_B!(C::StridedVecOrMat, M::OMPMatVec, B::StridedVecOrMat)
  T = promote_type(eltype(M), eltype(B))
  A_mul_B!(one(T), M, B, zero(T), C)
end


#include("ompmatvec.jl")
n = 2*10^6
A = sprandn(n,n,100/n); # create a fairly dense sparse matrix.
x = randn(n);
M = OMPMatVec(A, 1);
y = M*x;
@show norm(y - A*x)
println("Ax")
@time A*x;
@time A*x;
@time A*x;
println("Mx")
@time M*x;
@time M*x;
@time M*x;
M = OMPMatVec(A, 2);
println("Mx - 2")
@time M*x;
@time M*x;
@time M*x;
M = OMPMatVec(A, 4);
M = OMPMatVec(A, 4);
println("Mx - 4")
@time M*x;
@time M*x;
@time M*x;
M = OMPMatVec(A, 6);
println("Mx - 6")
@time M*x;
@time M*x;
@time M*x;
M = OMPMatVec(A, 12);
println("Mx - 12")
@time M*x;
@time M*x;
@time M*x;
;

The disassembly shows a

lock cmpxchgq   %rsi, (%rcx)

as the likely synchronization function.

Here’s what I see when I run this.

julia> include("ompmatvec.jl")
norm(y - A * x) = 0.0
Ax
  1.308998 seconds (86 allocations: 15.265 MiB)
  1.408404 seconds (6 allocations: 15.259 MiB, 3.59% gc time)
  1.370603 seconds (6 allocations: 15.259 MiB)
Mx
  2.927870 seconds (7 allocations: 15.259 MiB, 0.06% gc time)
  2.877898 seconds (7 allocations: 15.259 MiB)
  3.001519 seconds (7 allocations: 15.259 MiB, 0.04% gc time)
Mx - 2
  1.619115 seconds (7 allocations: 15.259 MiB)
  1.560433 seconds (7 allocations: 15.259 MiB, 0.02% gc time)
  1.457196 seconds (7 allocations: 15.259 MiB)
Mx - 4
  1.182852 seconds (7 allocations: 15.259 MiB, 0.02% gc time)
  0.822991 seconds (7 allocations: 15.259 MiB)
  0.873766 seconds (7 allocations: 15.259 MiB, 0.09% gc time)
Mx - 6
  0.605149 seconds (7 allocations: 15.259 MiB)
  0.559747 seconds (7 allocations: 15.259 MiB, 0.04% gc time)
  0.526836 seconds (7 allocations: 15.259 MiB)
Mx - 12
  0.466170 seconds (7 allocations: 15.259 MiB, 0.05% gc time)
  0.452362 seconds (7 allocations: 15.259 MiB)
  0.493106 seconds (7 allocations: 15.259 MiB, 0.14% gc time)

This is on a
Intel® Xeon® CPU E5-1650 v3 @ 3.50GHz


#4

It seems that the code is mostly memory stall limited. (63% cache miss measured on a kabylake laptop). That’s actually quite impressive since a full GC gives similar cache miss rate… I assume the existing high memory access cost combined with low conflicting rate reduces the slow down caused by atomic access. That said, >2x slow down is still a lot with a single thread. I would be really surprised if this is the most efficient way to do multithread sparse matrix multiplication but I’m not sure (I certainly don’t know any).

The omp atomic seems to be really restrictive so having an exact equivalent in julia will be unlikely. It also seems that in this case it won’t by you too much other than simple transformation (replace memory access with atomic version). We currently don’t have a way to do atomic access in normal objects and there’s plan to add those.


#5

Turns out the code in atomics.jl can be adapted for this purpose to give a pure Julia solution. This is posted in case it helps anyone in the future. This could – in principle – be adapted to function akin to the atomic instruction in OpenMP. Alternatively, it’d be nice to have some build in atomic functions that would operate on a raw pointer type – which is essentially what these do.

using Iterators
#simple_partition(n::Int, k::Int) = push!(map(first, partition(1:n,floor(Int,n/k))),n+1)
function simple_partition(n::Int,k::Int)
  rval = ones(Int,k+1)
  m,r = divrem(n,k)
  fill!(rval, m)
  rval[1] = 1
  for i = 2:k+1
    rval[i] += r .> (i-2)
  end
  cumsum!(rval,rval)
end

import Base.LinAlg.At_mul_B, Base.LinAlg.At_mul_B!, Base.LinAlg.A_mul_B!

"""
A type to encapsulate the multithreaded matrix-vector product operation.

Usage
-----
~~~~
A = sprandn(10^5,10^5,100/10^5); # create a fairly dense sparse matrix.
x = randn(10^5);
M = MultithreadedMatVec(A, simple_partition(A.n, Threads.nthreads()));
y = M.'*x;
norm(y - A.'*x)
~~~~
"""
mutable struct MultithreadedMatVec{T,I} <: AbstractArray{T,2}
  A::SparseMatrixCSC{T,I}
  regions::Vector{Int}

  MultithreadedMatVec(A::SparseMatrixCSC) = MultithreadedMatVec(A,Threads.nthreads())
  MultithreadedMatVec(A::SparseMatrixCSC, k::Int) = MultithreadedMatVec(A, simple_partition(A.n,k))
  function MultithreadedMatVec(A::SparseMatrixCSC{T,I}, regions::Vector{Int}) where {T,I}
    new{T,I}(A, regions)
  end
end

import Base.size
size(M::MultithreadedMatVec,inputs...) = size(M.A,inputs...)

# Julia's built in multiplication operations are called with
# A_mul_B!
# Ac_mul_B!
# At_mul_B!
# which take in
# α::Number, A::SparseMatrixCSC, B::StridedVecOrMat, β::Number, C::StridedVecOrMat
# and compute
# βC += αA B
# βC += αA^* B
# βC += αA^T B
# respectively
# look in base/sparse/linalg.jl
# for their implementations

""" Run the internal loop """
function internal_loop_transmult(C,B,nzv,rv,colptr,i1,i2,α::Number)
  for k=1:size(C,2)
    @inbounds for col=i1:i2
      tmp = zero(eltype(C))
      for j=colptr[col]:(colptr[col+1]-1)
        tmp += nzv[j]*B[rv[j],k]
      end
      C[col,k] += α*tmp
    end
  end
  return
end

"""
we are going to make these work with MuilthreadedMatVec types
"""
function At_mul_B!(α::Number, M::MultithreadedMatVec, B::StridedVecOrMat, β::Number, C::StridedVecOrMat)
  M.A.m == size(B,1) || throw(DimensionMismatch())
  M.A.n == size(C,1) || throw(DimensionMismatch())
  size(B,2) == size(C,2) || throw(DimensionMismatch())
  nzv = M.A.nzval
  rv = M.A.rowval
  colptr = M.A.colptr
  if β != 1
    β != 0 ? scale!(C, β) : fill!(C, zero(eltype(C)))
  end
  # this is the parallel construction
  Threads.@threads for t=1:(length(M.regions)-1)
    internal_loop_transmult(C,B,nzv,rv,colptr,M.regions[t],M.regions[t+1]-1,α)
  end
  C
end

function At_mul_B(M::MultithreadedMatVec{TA,S}, x::StridedVector{Tx}) where {TA,S,Tx}
  T = promote_type(TA,Tx)
  At_mul_B!(one(T), M, x, zero(T), similar(x, T, M.A.n))
end

module MyAtomics

lt = "double" # llvmtype
ilt = "i64" # llvmtype of int representation

import Base.Sys: ARCH, WORD_SIZE

@inline function atomic_cas!(x::Array{Float64}, ind::Int, oldval::Float64, newval::Float64)
    xptr = Base.unsafe_convert(Ptr{Float64}, x)
    xptr += 8*(ind-1)
    # on new versions of Julia, this should be
    # %iptr = inttoptr i$WORD_SIZE %0 to $ilt*
    Base.llvmcall( """%iptr = bitcast $lt* %0 to $ilt*
                      %icmp = bitcast $lt %1 to $ilt
                      %inew = bitcast $lt %2 to $ilt
                      %irs = cmpxchg $ilt* %iptr, $ilt %icmp, $ilt %inew acq_rel acquire
                      %irv = extractvalue { $ilt, i1 } %irs, 0
                      %rv = bitcast $ilt %irv to $lt
                      ret $lt %rv
                """,
    Float64,  # return type
    Tuple{Ptr{Float64}, Float64, Float64},  # argument types
    xptr, oldval, newval # arguments
    )
end

@inline function atomic_add!(x::Array{Float64}, ind::Int, val::Float64)
  IT = Int64
  xptr = Base.unsafe_convert(Ptr{Float64}, x)
  xptr += 8*(ind-1)

  while true
    oldval = x[ind]
    newval = oldval + val

    #old2 = atomic_cas!(x, ind, oldval, newval)
    # inline this call and optimize out some stuff
    old2 = Base.llvmcall( """%iptr = bitcast $lt* %0 to $ilt*
                      %icmp = bitcast $lt %1 to $ilt
                      %inew = bitcast $lt %2 to $ilt
                      %irs = cmpxchg $ilt* %iptr, $ilt %icmp, $ilt %inew acq_rel acquire
                      %irv = extractvalue { $ilt, i1 } %irs, 0
                      %rv = bitcast $ilt %irv to $lt
                      ret $lt %rv
                """,
    Float64,  # return type
    Tuple{Ptr{Float64}, Float64, Float64},  # argument types
    xptr, oldval, newval # arguments
    )

    if reinterpret(IT,oldval) == reinterpret(IT,old2)
      return newval
    end
  end
end

function internal_loop_mult(C,B,nzv,rv,colptr,i1,i2,α::Number)
  for k=1:size(C,2)
    koffset = size(C,1)*(k-1)
    @inbounds for col=i1:i2
      αxj = α*B[col,k]
      for j=colptr[col]:(colptr[col+1]-1)
        # need to be done atomically...
        #Threads.atomic_add!(C[rv[j],k], nzv[j]*αxj)
        MyAtomics.atomic_add!(C,koffset+rv[j],nzv[j]*αxj)
      end
    end
  end
  return
end

function A_mul_B!(α::Number, M::MultithreadedMatVec, B::StridedVecOrMat, β::Number, C::StridedVecOrMat)
  M.A.n == size(B,1) || throw(DimensionMismatch())
  M.A.m == size(C,1) || throw(DimensionMismatch())
  size(B,2) == size(C,2) || throw(DimensionMismatch())
  nzv = M.A.nzval
  rv = M.A.rowval
  colptr = M.A.colptr
  if β != 1
    β != 0 ? scale!(C, β) : fill!(C, zero(eltype(C)))
  end
  # this is the parallel construction
  Threads.@threads for t=1:(length(M.regions)-1)
    internal_loop_mult(C,B,nzv,rv,colptr,M.regions[t],M.regions[t+1]-1,α)
  end
  C
end

import Base.eltype
eltype(M::MultithreadedMatVec{T,I}) where {T,I} = T

function A_mul_B!(C::StridedVecOrMat, M::MultithreadedMatVec, B::StridedVecOrMat)
  T = promote_type(eltype(M), eltype(B))
  A_mul_B!(one(T), M, B, zero(T), C)
end


function test_perf()
  n = 2*10^6
  A = sprandn(n,n,100/n); # create a fairly dense sparse matrix.
  x = randn(n);
  M = MultithreadedMatVec(A, simple_partition(n, 1));
  @show norm(M*x - A*x)
  println("Ax")
  @time A*x;
  @time A*x;
  @time A*x;
  println("Mx")
  @time M*x;
  @time M*x;
  @time M*x;
  M = MultithreadedMatVec(A, simple_partition(n, 2));
  println("Mx - 2")
  @show norm(M*x - A*x)
  @time M*x;
  @time M*x;
  @time M*x;
  M = MultithreadedMatVec(A, simple_partition(n, 4));
  println("Mx - 4")
  @show norm(M*x - A*x)
  @time M*x;
  @time M*x;
  @time M*x;
  M = MultithreadedMatVec(A, simple_partition(n, 6));
  println("Mx - 6")
  @show norm(M*x - A*x)
  @time M*x;
  @time M*x;
  @time M*x;
  M = MultithreadedMatVec(A, simple_partition(n, 12));
  println("Mx - 12")
  @show norm(M*x - A*x)
  @time M*x;
  @time M*x;
  @time M*x;
  return 0
end

Using the same machine, I get

julia> include("mt-matvec-v2.jl"); test_perf()

WARNING: deprecated syntax "inner constructor MultithreadedMatVec(...) around /home/dgleich/Dropbox/research/2017/09-22-multithreaded-matvec-julia/mt-matvec-v2.jl:33".
Use "MultithreadedMatVec{T,I}(...) where {T,I}" instead.

WARNING: deprecated syntax "inner constructor MultithreadedMatVec(...) around /home/dgleich/Dropbox/research/2017/09-22-multithreaded-matvec-julia/mt-matvec-v2.jl:34".
Use "MultithreadedMatVec{T,I}(...) where {T,I}" instead.
norm(M * x - A * x) = 0.0
Ax
  1.266161 seconds (2 allocations: 15.259 MiB)
  1.262458 seconds (2 allocations: 15.259 MiB, 0.05% gc time)
  1.276305 seconds (2 allocations: 15.259 MiB)
Mx
  3.286811 seconds (14 allocations: 15.259 MiB, 0.23% gc time)
  3.261025 seconds (14 allocations: 15.259 MiB)
  3.289697 seconds (14 allocations: 15.259 MiB, 1.55% gc time)
Mx - 2
norm(M * x - A * x) = 5.779191123395817e-12
  2.050135 seconds (20 allocations: 15.259 MiB)
  2.037912 seconds (20 allocations: 15.259 MiB)
  2.117788 seconds (17 allocations: 15.259 MiB)
Mx - 4
norm(M * x - A * x) = 6.089783770138437e-12
  1.406513 seconds (31 allocations: 15.259 MiB)
  1.443651 seconds (26 allocations: 15.259 MiB, 3.58% gc time)
  1.413482 seconds (23 allocations: 15.259 MiB)
Mx - 6
norm(M * x - A * x) = 6.246273089788179e-12
  1.107459 seconds (44 allocations: 15.260 MiB)
  1.099101 seconds (23 allocations: 15.259 MiB)
  1.115828 seconds (25 allocations: 15.259 MiB)
Mx - 12
norm(M * x - A * x) = 6.257800652340715e-12
  0.498733 seconds (80 allocations: 15.260 MiB)
  0.509687 seconds (28 allocations: 15.259 MiB)
  0.483376 seconds (27 allocations: 15.259 MiB)
0

The result is slightly slower than the C++/OpenMP version. I think the llvm call might have some slightly unnecessary overhead and you could implement a tighter loop for that operation.

Still not great, 2x faster with 6 procs (12 threads). It’d be more interesting to look at how this performed on a wider subset of the UF Sparse Matrix collection. With and without metis pre-processing.

For the record, a better strategy for performance is to transpose the matrix first. On 12 threads (6 cores), I can do the same matvec in 0.35 seconds.