Distributed sparse matrix assembly

Hi,
I’m having trouble understanding how Distributed works. I guess it’s something trivial for someone, but I’m no expert on this. I assume it is a mistake related to the movement of data, but I don’t see how to solve this from the manual.

My goal is to assemble a matrix using remote workers, where each non-zero element takes a bit of time to compute, but they can be computed independently. The caveat (i think) is that for these non-zero elements some data is used that is computed ahead of time (e.g. the basis elements of a Galerkin method). I know how to and have parallelized it using threads, but this is quickly saturated and I would like to use more processors across multiple nodes.

Here’s my naive implementation with the serial execution at the end:

using Distributed
addprocs(16)

@everywhere begin
	using LinearAlgebra
	n=100
	a = [rand(3000) for i=1:n]
	b = [rand(3000) for i=1:n]
	c = randn(3000,3000)
end

const jobs = RemoteChannel(()->Channel{Tuple}(nworkers()));

const results = RemoteChannel(()->Channel{Tuple}(nworkers()));

@everywhere function do_dot(jobs, results, a,b,c)
	while true
		i,j = take!(jobs)
		res = sum((a[i]'*b[j])*c)
		put!(results, (i, j, res))
	end
end
function count_jobs(n)
	k=0
	for i in 1:n
		for j in 1:n
			if abs(i-j) < 20
				k+=1
			end
		end
	end
	return k
end
function make_jobs(n)
	for i in 1:n
		for j in 1:n
			if abs(i-j) < 20
				put!(jobs, (i,j))
			end
		end
	end
end

@time begin
	njobs = count_jobs(n)
	@async make_jobs(n) # feed the jobs channel with "n" jobs

	for p in workers() # start tasks on the workers to process requests in parallel
			remote_do(do_dot, p, jobs, results, a, b, c)
	end



	res = [take!(results) for n=1:njobs]
end


@time begin
	res = Vector{Tuple{Int64, Int64, Float64}}(undef,njobs)
	k=1
	for i in 1:n
		for j in 1:n
			if abs(i-j)<20
				res[k] = (i,j,sum((a[i]'*b[j])*c))
				k+=1
			end
		end
	end
	res
end

The serial execution takes about 75s and the parallel one with 16 workers takes about 52s. That’s better, but considering the amount of workers used, far from ideal.

Is there something obvious I’m doing wrong here? Or could someone point me to an example how it’s done right? Or is the communication here still the bottleneck and the individual jobs should take more time to make this approach useful?

EDIT: I should say that the parallel version fully utilizes all the cores.

What you probably want to do here is make results an Vector of length n. Then you can assemble all of the parts of the result without any inter-thread communication, and merge the result at the end.

1 Like

Did you take precompilation and garbage collection time into account?

Precompilation time is negligible says @time. Garbage collection time i don’t know, but if that’s the caveat, how do I avoid it?

Could you give me an example? I don’t see how the RemoteChannel should be a vector, such as in the serial execution. I think for that to work I would need a DistributedArray ? Is that the way to go?

EDIT: DistributedArrays have no setindex defined, such as SharedArrays, so that’s no option.

Should I join (by hand) many jobs into 1 job so that I end up with nworkers() jobs to be most efficient?

I don’t think this will work. Each non-zero element may take a little bit of time to compute, and then some more to be communicated? The communication time would kill efficiency… I think chunking would be required.

I just tried this, with similar performance:

using Distributed
addprocs(16)

@everywhere begin
	using LinearAlgebra
	n=100
	a = [rand(3000) for i=1:n]
	b = [rand(3000) for i=1:n]
	c = randn(3000,3000)
end

nworkers()

const jobs = RemoteChannel(()->Channel{Any}(nworkers()));

const results = RemoteChannel(()->Channel{Tuple}(nworkers()));

@everywhere function do_dot(jobs, results, a,b,c)
	while true
		ijs = take!(jobs)
		res = [sum((a[i]'*b[j])*c) for (i,j) in ijs]
		put!(results, (ijs, res))
	end
end

function count_all_indices(n)
	k=0
	for i in 1:n
		for j in 1:n
			if abs(i-j) < 20
				k+=1
			end
		end
	end
	return k
end

function dist_indices(n,nindices,np)
	n_extra = nindices%np
	ni = nindices÷np
	ns = [[] for i in 1:np]

	k,n_ = 1,1

	for i in 1:n
		for j in 1:n
			if abs(i-j) < 20
				push!(ns[n_],(i,j))
				k+=1
				if (k>ni) && (n_<np)
					n_+=1
					k=1
				end
			end
		end
	end
	return ns
end

function make_jobs(indices)
	for ij in indices
		put!(jobs, ij)
	end
end

@time begin
	nindices = count_all_indices(n)
	ijs = dist_indices(n,nindices, nworkers())

	@async make_jobs(ijs)


	@sync for p in workers() # start tasks on the workers to process requests in parallel
		remote_do(do_dot, p, jobs, results, a, b, c)
	end

	res = [take!(results) for n=1:nworkers()]
end

is this example too small to show parallel efficiency?