Dagger.jl optimising a two step matrix-vector multiplication

Hi All,

I am attempting to scale some serial code used in my code package Diplodocus.jl to be compatible with HPC systems, following suggestions to try distributing operations using Dagger.jl. My problem (in serial) is a case of sequential matrix-vector multiplications, which efficiently solves the problem Mabc * Vb * Vc in Einstein summation notation:

using LinearAlgebra
using BenchmarkTools

a = 160 # in reality much larger
b = 160 # in reality much larger
c = 160 # in reality much larger

### Serial problem setup

A = rand(Float64,a*b,c)
B = rand(Float64,c)
C = zeros(Float64,a,b)
C_reshape = reshape(C,a*b)
D_serial = similar(B)

function serial_two_step_gemmv!(A::Matrix{T},B::Vector{T},C::Matrix{T},C_reshape::Vector{T},D::Vector{T}) where {T}

    # Performs the operation M[a,b,c] * V[c] * V[b] as two matrix-vector multiplications
    # input A is the reshaped M to a matrix of size (a*b,c)
    # B is the input vector V
    # C_reshape is the intermediate result of (M[a,b,c] * V[c]) i.e. A*B which is a vector of size (a*b) but stored/reshaped in the same underlying data as C
    # C is a matrix of size (a,b) used for the second matrix-vector multiplication ((M[a,b,c]*V[c]) * V[b]) i.e. C*B 
    # D is the final output vector of size (a) 

    mul!(C_reshape,A,B)
    mul!(D,C,B)

    return D
end

@btime serial_two_step_gemmv!(A,B,C,C_reshape,D_serial) # 215.600 μs (0 allocations: 0 bytes)

My very basic attempt re-write this using Dagger.jl is:

using Distributed; addprocs(4)
nworkers = length(workers())
@everywhere using Dagger
@everywhere using LinearAlgebra

c_dist = Int64(c/nworkers)
DA = DArray(A,Blocks(a*b,c_dist)) # distribute the outer most dimension of A equally among workers (assuming c is divisible by nworkers)
DD = DArray(zeros(a,nworkers),Blocks(a,1)) # each worker will produce a local (a,1) vector as its local contribution to D
D_dagger = similar(B)

function dagger_two_step_gemmv!(DA::DMatrix{T},B::Vector{T},C::Matrix{T},C_reshape::Vector{T},DD::DMatrix{T},D::Vector{T},c_dist::Int64) where {T}

    DAc = DA.chunks
    DDc = DD.chunks

    DAmt, DAnt = size(DAc) # should be 1 x nworkers
    DDmt = size(DDc,2) # should be nworkers

    Dagger.spawn_datadeps() do 

        for w in range(1,DDmt)

            B_view = @view B[1+(w-1)*c_dist:w*c_dist]
            Dagger.@spawn mul!(InOut(C_reshape),In(DAc[1,w]),In(B_view))
            Dagger.@spawn mul!(InOut(DDc[1,w]),In(C),In(B))
            
        end

    end

    D .= sum(fetch(DD);dims=2)

    return D

end

@btime dagger_two_step_gemmv!($DA,$B,$C,$C_reshape,$DD,$D_dagger,$c_dist) # 6.014 ms (84316 allocations: 5.18 MiB)

D_dagger ≈ D_serial # true

My idea is to distribute the outer dimension of the initial matrix Mabc (here A) across all workers (keeping “workers” very general so that depending on the system they could be threads, CPUs or GPUs) and then the subsequent operations can all occur one each worker, independent of others, then finally collect the results. But its clear that this method isn’t faster than the serial code and allocates a lot.

The problem is that these subsequent operations require the reshaping of an intermediate Matrix/Vector C. I initially tried “distributing” C such that each worker gets a local copy to work with using either DC = DArray(zeros(a,b*nworkers),Blocks(a,b)) such that each worker will work with a local (a,b) block of C or using DC = Dagger.@shard C. But I couldn’t get either of these approaches to work, with issues either with Dagger.jl not liking me trying to reshape these objects or getting incorrect final results. It seems to me that using @shard should, in theory, allow both a DC and DC_reshape to be defined in advance so that each worker can work with their own shard without needing to allocate or move any data. (It also makes sense that B could be sharded so each worker gets a copy but I am not sure the best way to update these shards without allocating/garbage collection.

Any help would be greatly appreciated!