Hi,
I’m having trouble understanding how Distributed
works. I guess it’s something trivial for someone, but I’m no expert on this. I assume it is a mistake related to the movement of data, but I don’t see how to solve this from the manual.
My goal is to assemble a matrix using remote workers, where each non-zero element takes a bit of time to compute, but they can be computed independently. The caveat (i think) is that for these non-zero elements some data is used that is computed ahead of time (e.g. the basis elements of a Galerkin method). I know how to and have parallelized it using threads, but this is quickly saturated and I would like to use more processors across multiple nodes.
Here’s my naive implementation with the serial execution at the end:
using Distributed
addprocs(16)
@everywhere begin
using LinearAlgebra
n=100
a = [rand(3000) for i=1:n]
b = [rand(3000) for i=1:n]
c = randn(3000,3000)
end
const jobs = RemoteChannel(()->Channel{Tuple}(nworkers()));
const results = RemoteChannel(()->Channel{Tuple}(nworkers()));
@everywhere function do_dot(jobs, results, a,b,c)
while true
i,j = take!(jobs)
res = sum((a[i]'*b[j])*c)
put!(results, (i, j, res))
end
end
function count_jobs(n)
k=0
for i in 1:n
for j in 1:n
if abs(i-j) < 20
k+=1
end
end
end
return k
end
function make_jobs(n)
for i in 1:n
for j in 1:n
if abs(i-j) < 20
put!(jobs, (i,j))
end
end
end
end
@time begin
njobs = count_jobs(n)
@async make_jobs(n) # feed the jobs channel with "n" jobs
for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_dot, p, jobs, results, a, b, c)
end
res = [take!(results) for n=1:njobs]
end
@time begin
res = Vector{Tuple{Int64, Int64, Float64}}(undef,njobs)
k=1
for i in 1:n
for j in 1:n
if abs(i-j)<20
res[k] = (i,j,sum((a[i]'*b[j])*c))
k+=1
end
end
end
res
end
The serial execution takes about 75s and the parallel one with 16 workers takes about 52s. That’s better, but considering the amount of workers used, far from ideal.
Is there something obvious I’m doing wrong here? Or could someone point me to an example how it’s done right? Or is the communication here still the bottleneck and the individual jobs should take more time to make this approach useful?
EDIT: I should say that the parallel version fully utilizes all the cores.