I want to assemble a sparse matrix using Distributed
. I use DistributedArrays
for the nonzero elements. The code calls some functions (assemble_synced!
or assemble_not_synced!
) which assemble submatrices of the whole matrix. It works fine when the individual loops are called as @sync @distributed for xxx
. However, in the real application this leads to unnecessary waiting for some workers to finish until moving to the next @distributed for
loop. I thought I could just @sync
around all the @distributed
calls, but it doesn’t seem to synchronize these. Is this expected behaviour? If yes, is there another way to avoid the unnecessary waiting?
julia> using Distributed
julia> addprocs(4)
4-element Vector{Int64}:
2
3
4
5
julia> @everywhere using DistributedArrays
julia> @everywhere begin
function slowcalculation(i,j)
sleep(0.001)
rand()*i*j
end
function assemble_synced!(n, is, js, aijs)
@sync @distributed for i=1:n
for j=1:n
aij = slowcalculation(i,j)
if aij/(i*j) > 0.7
push!(first(localpart(is)), i)
push!(first(localpart(js)), j)
push!(first(localpart(aijs)),aij)
end
end
end
return nothing
end
function assemble_not_synced!(n, is, js, aijs)
@distributed for i=1:n
for j=1:n
aij = slowcalculation(i,j)
if aij/(i*j) > 0.7
push!(first(localpart(is)), i)
push!(first(localpart(js)), j)
push!(first(localpart(aijs)),aij)
end
end
end
return nothing
end
function assemble_synced(n)
is = distribute([Int[] for _ in procs()])
js = distribute([Int[] for _ in procs()])
aijs = distribute([Float64[] for _ in procs()])
assemble_synced!(n,is,js,aijs)
assemble_synced!(n,is,js,aijs)
assemble_synced!(n,is,js,aijs)
assemble_synced!(n,is,js,aijs)
return sparse(vcat(is...), vcat(js...), vcat(aijs...), n, n)
end
function assemble_not_synced(n)
is = distribute([Int[] for _ in procs()])
js = distribute([Int[] for _ in procs()])
aijs = distribute([Float64[] for _ in procs()])
@sync begin
assemble_not_synced!(n,is,js,aijs)
assemble_not_synced!(n,is,js,aijs)
assemble_not_synced!(n,is,js,aijs)
assemble_not_synced!(n,is,js,aijs)
end
return sparse(vcat(is...), vcat(js...), vcat(aijs...), n, n)
end
end
julia> assemble_synced(50)
50×50 SparseMatrixCSC{Float64, Int64} with 1829 stored entries:
⣺⣫⣎⠴⣞⡷⠟⣤⡻⡷⣻⡻⣝⣋⡿⡿⣮⢛⡯⢷⡷⣼⣷⡎⣽
⣖⣻⣿⣯⣝⠺⢏⣻⢬⠶⢻⡻⢴⣻⣋⠾⢻⢿⣷⢝⠾⣷⡷⡷⡦
⡼⡞⣝⣹⠟⢷⣯⣷⡭⣼⣮⡗⡯⢽⠵⡯⠯⣺⢗⡫⡿⠽⠻⡞⣿
⣯⡯⢺⡶⢟⣿⡽⣯⣛⡻⢯⡭⣫⣷⣍⡛⢟⣮⣽⣿⣓⣳⡻⡝⡿
⠝⣴⣝⣼⠻⣩⠾⣣⣌⣼⢽⣾⡝⣿⡶⡷⡽⣳⢖⣮⢟⠿⣾⠽⠝
⣟⢞⡻⠓⣹⣹⡯⣎⢿⣼⢇⡳⣿⡽⣿⢕⣷⢃⣽⢮⡳⣾⡟⣴⡶
⢻⣷⠧⣺⡞⣾⢿⢿⣦⣽⢽⣎⢮⢻⣝⣿⣿⠯⣕⣣⣶⣟⣣⣯⣗
⣟⡵⡾⡥⣾⣾⢻⣹⢧⢾⢿⢗⣾⣾⣽⣟⡪⣱⢫⡾⡿⡱⣯⣯⢷
⠣⢟⣭⣯⢻⠺⣭⢻⣙⣮⣊⢕⢿⡯⣟⢵⣿⣯⣄⣿⢘⣯⣶⡷⡷
⡇⣾⣿⣥⢶⣷⠷⣧⢺⣗⢟⣵⣯⢿⡷⣲⢋⡭⡯⣮⣟⢾⣽⢿⣬
⣮⣯⡯⣯⣼⣪⡿⣷⡪⣽⠚⡭⣺⡮⢿⣵⣵⣳⡚⡥⡧⣹⣷⢟⡱
⡽⣿⣳⢯⣡⡿⣻⡺⣻⣟⢛⢿⡻⡷⣴⣯⣫⢾⡜⣽⢿⣱⣏⣗⣿
⠛⠛⠓⠚⠚⠛⠛⠒⠓⠛⠙⠛⠑⠛⠛⠋⠋⠙⠓⠓⠙⠂⠛⠋⠛
julia> assemble_not_synced(50)
ERROR: ArgumentError: the first three arguments' lengths must match, length(I) (=1) == length(J) (= 6) == length(V) (= 9)
Stacktrace:
[1] sparse(I::Vector{Int64}, J::Vector{Int64}, V::Vector{Float64}, m::Int64, n::Int64, combine::Function)
@ SparseArrays /Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/SparseArrays/src/sparsematrix.jl:825
[2] sparse(I::Vector{Int64}, J::Vector{Int64}, V::Vector{Float64}, m::Int64, n::Int64)
@ SparseArrays /Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/SparseArrays/src/sparsematrix.jl:1045
[3] assemble_not_synced(n::Int64)
@ Main ./REPL[4]:68
[4] top-level scope
@ REPL[6]:1
The error here occurs, because the loops are not synchronized properly so that the elements of is
, js
, aijs
are not necessarily filled correctly at the time of calling sparse.