I’m starting to learn about concurrent / parallel / distributed computing on a simple graph problem and would really appreciate any help. I am trying to squeeze every drop of performance out of Julia, so comments on how to decrease memory consumption and runtime are most welcome. In short:
Given a directed acyclic graph with weighted edges, I must transform it into a new graph with same vertices, but where an edge i\leftarrow j is obtained as the aggregation of all possible paths from j to i in the initial graph (including paths with 0 edges). The aggregation is such that we sum the weights of all such paths, where the weight of a path is the product of weights of its edges. Typically, my graphs have lots of vertices (e.g. millions), but each vertex has small degree (e.g. up to 10).
More concretely, the vertices of the graph are 1,\ldots,n and for every edge i \overset{w}{\leftarrow} j there holds i<j. My input are arrays J
and W
, so that J[j]\subseteq\{1,\ldots,j\!-\!1\} are the end-vertices of edges out of j and W[j] are the weights of those edges. Thus vertex j has degree length(J[j])=length(W[j])
. For example,
J = [Int64[], [1], Int64[], [1,2,3], Int64[], [4,5], [4,6], Int64[], [6], [5,7]];
W = [Float64[], [0.2], Float64[], [-0.9,-0.1,-0.7], Float64[], [0.4,0.5], [-0.7,-0.6], Float64[], [0.8], [-0.3,0.9]];
corresponds to the graph:
The desired output is the graph:
J = [[1], [1, 2], [3], [1, 2, 3, 4], [5], [1, 2, 3, 4, 5, 6], [1, 2, 3, 4, 5, 6, 7], [8], [1, 2, 3, 4, 5, 6, 9], [1, 2, 3, 4, 5, 6, 7, 10]]
W = [[1.0], [0.2, 1.0], [1.0], [-0.92, -0.1, -0.7, 1.0], [1.0], [-0.36800000000000005, -0.04000000000000001, -0.27999999999999997, 0.4, 0.5, 1.0], [0.8648, 0.094, 0.6579999999999999, -0.94, -0.3, -0.6, 1.0], [1.0], [-0.29440000000000005, -0.03200000000000001, -0.22399999999999998, 0.32000000000000006, 0.4, 0.8, 1.0], [0.77832, 0.08460000000000001, 0.5922, -0.846, -0.5700000000000001, -0.54, 0.9, 1.0]]
The non-parallel solution (using dynamic programming) of this problem is the function accumulate0
:
using StatsBase; dtype=Float64;
function create_toy_data(n,s) # s = max degree of vertices
J = [sample(1:j-1,r,replace=false,ordered=true)
for j=1:n for r in rand(0:min(j-1,s),1)]
W = [rand(-1:0.01:1,length(w)) for w∈J]
return J, W, deepcopy(J), deepcopy(W); end;
function acml!(jj ::Vector{Int}, ww ::Vector{dtype})
"""Accumulate: sort jj and ww according to jj, then replace those
elements in ww that have the same value in jj by their sum."""
pi = sortperm(jj); jj[:] = jj[pi]; ww[:] = ww[pi]; i,j=1,1;
while j<length(ww) j+=1;
if jj[i]==jj[j] ww[i]+=ww[j]; jj[j]=0; else i=j; end end
pi = (jj .!= 0) .& (ww .!= 0);
return jj[pi], ww[pi]; end;
function accumulate0!(J ::Vector{Vector{Int}}, W ::Vector{Vector{dtype}})
"""Use paths starting in neighbors of vertex j, they have already been
computed, and prolong those paths by 1 edge (from j to the neighbor)."""
for j=1:length(J) if j%100==0 print(j," ") end
W[j] = vcat((W[j] .* W[J[j]])..., [dtype(1)])
J[j] = vcat( J[J[j]]..., [j])
J[j], W[j] = acml!(J[j],W[j]) end end;
A small example like above can be obtained with:
J, W, J_, W_ = create_toy_data(10^1,10);
@time accumulate0!(J,W)
display(J), display(W);
Running the same code on my 6-core CPU, but with create_toy_data(10^5,10)
returns:
139.971336 seconds (3.45 M allocations: 89.583 GiB, 1.70% gc time, 0.02% compilation time)
I wish to improve this benchmark with whatever is possible (e.g. parallel computing).
My clumsy attempt at synchronous parallelization is:
function accumulate1!(J ::Vector{Vector{Int}}, W ::Vector{Vector{dtype}})
"""Each thread waits until paths out of neighbors of vertex j are all
computed, then it prolongs those paths by 1 edge."""
F = falses(length(J)); F[1]=1 # keep track of finished vertices
function delay(W ::Vector{Vector{dtype}}, j ::Int)
while !F[j] end; return W[j]; end;
Threads.@threads for j=1:length(J) if j%100==0 print(j," ") end
W[j] = vcat((W[j] .* [delay(W,i) for i∈J[j]])..., [dtype(1)])
J[j] = vcat( J[J[j]]..., [j])
(J[j], W[j]), F[j] = acml!(J[j],W[j]), 1; end end; # mark as finished
If you run the snippet below, you’ll see that the computation never finishes. I do not understand why.
J, W, J_, W_ = create_toy_data(10^1,10);
@time accumulate1!(J_,W_)
My attempt at asynchronous parallelization is:
function accumulate2!(J ::Vector{Vector{Int}}, W ::Vector{Vector{dtype}})
"""Each thread looks at neighbors of vertex j, unfinished ones are kept
for later use, finished ones are used immediately and stored separately
(paths out of them are prolonged by the edge from j to the neighbor)."""
J_, W_ = [Int[] for j∈J], [dtype[] for w∈W] # final result is filled-in
um = min(Threads.nthreads()*100,length(J)) # unicore multicore split
F = falses(length(J)) # flag for finished vertices
for j=1:um if j%100==0 print(j," ") end
W[j], W_[j] = [], vcat((W[j].*W_[J[j]])...,[dtype(1)])
J[j], J_[j] = [], vcat( J_[J[j]]..., [j])
J_[j], W_[j] = acml!(J_[j],W_[j]) end
F[1:um] .= 1
while !all(F) # keep going until all computations are finished
Threads.@threads for j=um+1:length(J) j%100==0 && print(j," ");
F[j]==1 && continue; ii=F[J[j]]; # ii=flag for (un)finished
W[j], W_[j] = W[j][ii .==0], vcat(W_[j],(W[j][ii].*W_[J[j][ii]])...)
J[j], J_[j] = J[j][ii .==0], vcat(J_[j], J_[J[j][ii]]...)
J_[j], W_[j] = acml!(J_[j],W_[j]);
if length(J[j])==0
J_[j], W_[j] = vcat(J_[j],[j]), vcat(W_[j],[dtype(1)])
F[j] = 1; end end end
for j=1:length(J) J[j], W[j], J_[j], W_[j] = J_[j], W_[j], [], []; end end;
Notice that I split the calculations into non-parallel for the first 600 vertices, then parallel from j=601 onward. This is to ensure that when each of the 6 threads pick vertex j\in\{601,...,606\}, every edge out of j=606 has a small chance (5/605 to be precise) of ending in an unfinished vertex 601\leq i\leq 605. This should ensure that there are not too many passes over the whole vertex set, since each pass will produce many new finished vertices.
Running the snippet below works, but is even slower than the non-parallel function:
J, W, J_, W_ = create_toy_data(10^5,10);
@time accumulate0!(J ,W )
@time accumulate2!(J_,W_)
println(J==J_, " ", W==W_," ",maximum([maximum(abs.(w)) for w in W .- W_]))
139.740774 seconds (3.44 M allocations: 88.720 GiB, 1.65% gc time)
179.144954 seconds (11.54 M allocations: 123.498 GiB, 46.76% gc time)
true false 2.842170943040401e-14
How can I fix these failures?