Dagger.jl optimising a two step matrix-vector multiplication

Hi All,

I am attempting to scale some serial code used in my code package Diplodocus.jl to be compatible with HPC systems, following suggestions to try distributing operations using Dagger.jl. My problem (in serial) is a case of sequential matrix-vector multiplications, which efficiently solves the problem Mabc * Vb * Vc in Einstein summation notation:

using LinearAlgebra
using BenchmarkTools

a = 160 # in reality much larger
b = 160 # in reality much larger
c = 160 # in reality much larger

### Serial problem setup

A = rand(Float64,a*b,c)
B = rand(Float64,c)
C = zeros(Float64,a,b)
C_reshape = reshape(C,a*b)
D_serial = similar(B)

function serial_two_step_gemmv!(A::Matrix{T},B::Vector{T},C::Matrix{T},C_reshape::Vector{T},D::Vector{T}) where {T}

    # Performs the operation M[a,b,c] * V[c] * V[b] as two matrix-vector multiplications
    # input A is the reshaped M to a matrix of size (a*b,c)
    # B is the input vector V
    # C_reshape is the intermediate result of (M[a,b,c] * V[c]) i.e. A*B which is a vector of size (a*b) but stored/reshaped in the same underlying data as C
    # C is a matrix of size (a,b) used for the second matrix-vector multiplication ((M[a,b,c]*V[c]) * V[b]) i.e. C*B 
    # D is the final output vector of size (a) 

    mul!(C_reshape,A,B)
    mul!(D,C,B)

    return D
end

@btime serial_two_step_gemmv!(A,B,C,C_reshape,D_serial) # 215.600 μs (0 allocations: 0 bytes)

My very basic attempt re-write this using Dagger.jl is:

using Distributed; addprocs(4)
nworkers = length(workers())
@everywhere using Dagger
@everywhere using LinearAlgebra

c_dist = Int64(c/nworkers)
DA = DArray(A,Blocks(a*b,c_dist)) # distribute the outer most dimension of A equally among workers (assuming c is divisible by nworkers)
DD = DArray(zeros(a,nworkers),Blocks(a,1)) # each worker will produce a local (a,1) vector as its local contribution to D
D_dagger = similar(B)

function dagger_two_step_gemmv!(DA::DMatrix{T},B::Vector{T},C::Matrix{T},C_reshape::Vector{T},DD::DMatrix{T},D::Vector{T},c_dist::Int64) where {T}

    DAc = DA.chunks
    DDc = DD.chunks

    DAmt, DAnt = size(DAc) # should be 1 x nworkers
    DDmt = size(DDc,2) # should be nworkers

    Dagger.spawn_datadeps() do 

        for w in range(1,DDmt)

            B_view = @view B[1+(w-1)*c_dist:w*c_dist]
            Dagger.@spawn mul!(InOut(C_reshape),In(DAc[1,w]),In(B_view))
            Dagger.@spawn mul!(InOut(DDc[1,w]),In(C),In(B))
            
        end

    end

    D .= sum(fetch(DD);dims=2)

    return D

end

@btime dagger_two_step_gemmv!($DA,$B,$C,$C_reshape,$DD,$D_dagger,$c_dist) # 6.014 ms (84316 allocations: 5.18 MiB)

D_dagger ≈ D_serial # true

My idea is to distribute the outer dimension of the initial matrix Mabc (here A) across all workers (keeping “workers” very general so that depending on the system they could be threads, CPUs or GPUs) and then the subsequent operations can all occur one each worker, independent of others, then finally collect the results. But its clear that this method isn’t faster than the serial code and allocates a lot.

The problem is that these subsequent operations require the reshaping of an intermediate Matrix/Vector C. I initially tried “distributing” C such that each worker gets a local copy to work with using either DC = DArray(zeros(a,b*nworkers),Blocks(a,b)) such that each worker will work with a local (a,b) block of C or using DC = Dagger.@shard C. But I couldn’t get either of these approaches to work, with issues either with Dagger.jl not liking me trying to reshape these objects or getting incorrect final results. It seems to me that using @shard should, in theory, allow both a DC and DC_reshape to be defined in advance so that each worker can work with their own shard without needing to allocate or move any data. (It also makes sense that B could be sharded so each worker gets a copy but I am not sure the best way to update these shards without allocating/garbage collection.

Any help would be greatly appreciated!


If it help I drew a diagram of how I (naievely) imagine this distributed process being implimented.

This is just a simple case showing two workers but hopefully you can see how this would easily scale. In my head, the very large matrix A is distributed among workers in advance and each worker is also allocated an array C (with Cr being its reshape) that it can modify the data of locally via the first mul, and then the resulting vectors D1 and D2 are collected and summed to give the new B vector, and the process repeats. So the only data transfer between workers and the “main process” are the small vectors B and D. Only issue is I cannont get this setup to work, particularly how to impliment the C arrays and its reshape