# Distributed sparse matrix assembly

Hi,
I’m having trouble understanding how `Distributed` works. I guess it’s something trivial for someone, but I’m no expert on this. I assume it is a mistake related to the movement of data, but I don’t see how to solve this from the manual.

My goal is to assemble a matrix using remote workers, where each non-zero element takes a bit of time to compute, but they can be computed independently. The caveat (i think) is that for these non-zero elements some data is used that is computed ahead of time (e.g. the basis elements of a Galerkin method). I know how to and have parallelized it using threads, but this is quickly saturated and I would like to use more processors across multiple nodes.

Here’s my naive implementation with the serial execution at the end:

``````using Distributed

@everywhere begin
using LinearAlgebra
n=100
a = [rand(3000) for i=1:n]
b = [rand(3000) for i=1:n]
c = randn(3000,3000)
end

const jobs = RemoteChannel(()->Channel{Tuple}(nworkers()));

const results = RemoteChannel(()->Channel{Tuple}(nworkers()));

@everywhere function do_dot(jobs, results, a,b,c)
while true
i,j = take!(jobs)
res = sum((a[i]'*b[j])*c)
put!(results, (i, j, res))
end
end
function count_jobs(n)
k=0
for i in 1:n
for j in 1:n
if abs(i-j) < 20
k+=1
end
end
end
return k
end
function make_jobs(n)
for i in 1:n
for j in 1:n
if abs(i-j) < 20
put!(jobs, (i,j))
end
end
end
end

@time begin
njobs = count_jobs(n)
@async make_jobs(n) # feed the jobs channel with "n" jobs

for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_dot, p, jobs, results, a, b, c)
end

res = [take!(results) for n=1:njobs]
end

@time begin
res = Vector{Tuple{Int64, Int64, Float64}}(undef,njobs)
k=1
for i in 1:n
for j in 1:n
if abs(i-j)<20
res[k] = (i,j,sum((a[i]'*b[j])*c))
k+=1
end
end
end
res
end
``````

The serial execution takes about 75s and the parallel one with 16 workers takes about 52s. That’s better, but considering the amount of workers used, far from ideal.

Is there something obvious I’m doing wrong here? Or could someone point me to an example how it’s done right? Or is the communication here still the bottleneck and the individual jobs should take more time to make this approach useful?

EDIT: I should say that the parallel version fully utilizes all the cores.

What you probably want to do here is make `results` an `Vector` of length `n`. Then you can assemble all of the parts of the result without any inter-thread communication, and merge the result at the end.

1 Like

Did you take precompilation and garbage collection time into account?

Precompilation time is negligible says `@time`. Garbage collection time i don’t know, but if that’s the caveat, how do I avoid it?

Could you give me an example? I don’t see how the `RemoteChannel` should be a vector, such as in the serial execution. I think for that to work I would need a `DistributedArray` ? Is that the way to go?

EDIT: `DistributedArrays` have no `setindex` defined, such as `SharedArrays`, so that’s no option.

Should I join (by hand) many jobs into 1 job so that I end up with `nworkers()` jobs to be most efficient?

I don’t think this will work. Each non-zero element may take a little bit of time to compute, and then some more to be communicated? The communication time would kill efficiency… I think chunking would be required.

I just tried this, with similar performance:

``````using Distributed

@everywhere begin
using LinearAlgebra
n=100
a = [rand(3000) for i=1:n]
b = [rand(3000) for i=1:n]
c = randn(3000,3000)
end

nworkers()

const jobs = RemoteChannel(()->Channel{Any}(nworkers()));

const results = RemoteChannel(()->Channel{Tuple}(nworkers()));

@everywhere function do_dot(jobs, results, a,b,c)
while true
ijs = take!(jobs)
res = [sum((a[i]'*b[j])*c) for (i,j) in ijs]
put!(results, (ijs, res))
end
end

function count_all_indices(n)
k=0
for i in 1:n
for j in 1:n
if abs(i-j) < 20
k+=1
end
end
end
return k
end

function dist_indices(n,nindices,np)
n_extra = nindices%np
ni = nindices÷np
ns = [[] for i in 1:np]

k,n_ = 1,1

for i in 1:n
for j in 1:n
if abs(i-j) < 20
push!(ns[n_],(i,j))
k+=1
if (k>ni) && (n_<np)
n_+=1
k=1
end
end
end
end
return ns
end

function make_jobs(indices)
for ij in indices
put!(jobs, ij)
end
end

@time begin
nindices = count_all_indices(n)
ijs = dist_indices(n,nindices, nworkers())

@async make_jobs(ijs)

@sync for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_dot, p, jobs, results, a, b, c)
end

res = [take!(results) for n=1:nworkers()]
end
``````

is this example too small to show parallel efficiency?