Learning (a)synchronous parallelization on a simple graph problem

I’m starting to learn about concurrent / parallel / distributed computing on a simple graph problem and would really appreciate any help. I am trying to squeeze every drop of performance out of Julia, so comments on how to decrease memory consumption and runtime are most welcome. In short:

Given a directed acyclic graph with weighted edges, I must transform it into a new graph with same vertices, but where an edge i\leftarrow j is obtained as the aggregation of all possible paths from j to i in the initial graph (including paths with 0 edges). The aggregation is such that we sum the weights of all such paths, where the weight of a path is the product of weights of its edges. Typically, my graphs have lots of vertices (e.g. millions), but each vertex has small degree (e.g. up to 10).

More concretely, the vertices of the graph are 1,\ldots,n and for every edge i \overset{w}{\leftarrow} j there holds i<j. My input are arrays J and W, so that J[j]\subseteq\{1,\ldots,j\!-\!1\} are the end-vertices of edges out of j and W[j] are the weights of those edges. Thus vertex j has degree length(J[j])=length(W[j]). For example,

J = [Int64[], [1], Int64[], [1,2,3], Int64[], [4,5], [4,6], Int64[], [6], [5,7]];
W = [Float64[], [0.2], Float64[], [-0.9,-0.1,-0.7], Float64[], [0.4,0.5], [-0.7,-0.6], Float64[], [0.8], [-0.3,0.9]];

corresponds to the graph:

The desired output is the graph:

J = [[1], [1, 2], [3], [1, 2, 3, 4], [5], [1, 2, 3, 4, 5, 6], [1, 2, 3, 4, 5, 6, 7], [8], [1, 2, 3, 4, 5, 6, 9], [1, 2, 3, 4, 5, 6, 7, 10]]
W = [[1.0], [0.2, 1.0], [1.0], [-0.92, -0.1, -0.7, 1.0], [1.0], [-0.36800000000000005, -0.04000000000000001, -0.27999999999999997, 0.4, 0.5, 1.0], [0.8648, 0.094, 0.6579999999999999, -0.94, -0.3, -0.6, 1.0], [1.0], [-0.29440000000000005, -0.03200000000000001, -0.22399999999999998, 0.32000000000000006, 0.4, 0.8, 1.0], [0.77832, 0.08460000000000001, 0.5922, -0.846, -0.5700000000000001, -0.54, 0.9, 1.0]]

The non-parallel solution (using dynamic programming) of this problem is the function accumulate0:

using StatsBase;   dtype=Float64; 
function create_toy_data(n,s)  # s = max degree of vertices
    J = [sample(1:j-1,r,replace=false,ordered=true) 
         for j=1:n for r in rand(0:min(j-1,s),1)]
    W = [rand(-1:0.01:1,length(w)) for w∈J]
    return J, W, deepcopy(J), deepcopy(W); end;
function acml!(jj ::Vector{Int}, ww ::Vector{dtype}) 
    """Accumulate: sort jj and ww according to jj, then replace those 
    elements in ww that have the same value in jj by their sum."""
    pi = sortperm(jj);   jj[:] = jj[pi];   ww[:] = ww[pi];    i,j=1,1; 
    while j<length(ww)   j+=1;   
        if jj[i]==jj[j]  ww[i]+=ww[j]; jj[j]=0; else i=j; end end 
    pi = (jj .!= 0) .& (ww .!= 0);   
    return jj[pi], ww[pi];   end; 
function accumulate0!(J ::Vector{Vector{Int}}, W ::Vector{Vector{dtype}}) 
    """Use paths starting in neighbors of vertex j, they have already been 
    computed, and prolong those paths by 1 edge (from j to the neighbor)."""
    for j=1:length(J)     if j%100==0  print(j," ") end 
	    W[j] = vcat((W[j] .* W[J[j]])..., [dtype(1)])
        J[j] = vcat(         J[J[j]]...,         [j])     
        J[j], W[j] = acml!(J[j],W[j]) end end;

A small example like above can be obtained with:

J, W, J_, W_ = create_toy_data(10^1,10);
@time accumulate0!(J,W) 
display(J), display(W);

Running the same code on my 6-core CPU, but with create_toy_data(10^5,10) returns:

139.971336 seconds (3.45 M allocations: 89.583 GiB, 1.70% gc time, 0.02% compilation time)

I wish to improve this benchmark with whatever is possible (e.g. parallel computing).


My clumsy attempt at synchronous parallelization is:

function accumulate1!(J ::Vector{Vector{Int}}, W ::Vector{Vector{dtype}}) 
    """Each thread waits until paths out of neighbors of vertex j are all 
    computed, then it prolongs those paths by 1 edge."""
    F = falses(length(J));   F[1]=1  # keep track of finished vertices
    function delay(W ::Vector{Vector{dtype}}, j ::Int)  
        while !F[j] end;   return W[j];   end;
    Threads.@threads for j=1:length(J)     if j%100==0  print(j," ") end 
	    W[j] = vcat((W[j] .* [delay(W,i) for i∈J[j]])..., [dtype(1)])
        J[j] = vcat(                         J[J[j]]...,         [j]) 
        (J[j], W[j]), F[j]  = acml!(J[j],W[j]), 1;   end end;  # mark as finished

If you run the snippet below, you’ll see that the computation never finishes. I do not understand why.

J, W, J_, W_ = create_toy_data(10^1,10);
@time accumulate1!(J_,W_)

My attempt at asynchronous parallelization is:

function accumulate2!(J ::Vector{Vector{Int}}, W ::Vector{Vector{dtype}}) 
    """Each thread looks at neighbors of vertex j, unfinished ones are kept 
    for later use, finished ones are used immediately and stored separately 
    (paths out of them are prolonged by the edge from j to the neighbor)."""
    J_, W_ = [Int[] for j∈J], [dtype[] for w∈W]  # final result is filled-in
    um = min(Threads.nthreads()*100,length(J))  # unicore multicore split
    F = falses(length(J))  # flag for finished vertices
    for j=1:um     if j%100==0  print(j," ") end 
	    W[j], W_[j] = [], vcat((W[j].*W_[J[j]])...,[dtype(1)])
        J[j], J_[j] = [], vcat(       J_[J[j]]...,        [j])     
        J_[j], W_[j] = acml!(J_[j],W_[j]) end
    F[1:um] .= 1
    while !all(F)  # keep going until all computations are finished
        Threads.@threads for j=um+1:length(J)     j%100==0 && print(j," ");
            F[j]==1 && continue;   ii=F[J[j]];  # ii=flag for (un)finished
            W[j], W_[j] = W[j][ii .==0], vcat(W_[j],(W[j][ii].*W_[J[j][ii]])...) 
            J[j], J_[j] = J[j][ii .==0], vcat(J_[j],           J_[J[j][ii]]...)  
            J_[j], W_[j] = acml!(J_[j],W_[j]);
            if length(J[j])==0   
                J_[j], W_[j] = vcat(J_[j],[j]), vcat(W_[j],[dtype(1)])
                F[j] = 1; end end end
    for j=1:length(J)  J[j], W[j], J_[j], W_[j] = J_[j], W_[j], [], []; end end;

Notice that I split the calculations into non-parallel for the first 600 vertices, then parallel from j=601 onward. This is to ensure that when each of the 6 threads pick vertex j\in\{601,...,606\}, every edge out of j=606 has a small chance (5/605 to be precise) of ending in an unfinished vertex 601\leq i\leq 605. This should ensure that there are not too many passes over the whole vertex set, since each pass will produce many new finished vertices.

Running the snippet below works, but is even slower than the non-parallel function:

J, W, J_, W_ = create_toy_data(10^5,10);  
@time accumulate0!(J ,W ) 
@time accumulate2!(J_,W_) 
println(J==J_, " ", W==W_," ",maximum([maximum(abs.(w)) for w in W .- W_])) 
139.740774 seconds (3.44 M allocations: 88.720 GiB, 1.65% gc time)
179.144954 seconds (11.54 M allocations: 123.498 GiB, 46.76% gc time)
true false 2.842170943040401e-14

How can I fix these failures?

Is there a way I can make the question more appealing?

I don’t see anything wrong with your question. It is just long and would require someone to read pretty closely to understand your code, come up with ideas for improvement, (ideally) test out their idea to make sure it works, and then write up their response.

It’s time-consuming work for a volunteer community! There is nothing wrong with asking, but I don’t want you to come away feeling snubbed if no one can get back to you.

I, for one, am stretched pretty thin, so I’m not going to be able to take a shot at this for a while.

You might have luck bumping this on the Julia Slack #helpdesk channel too! Just something like “Hey, I posted this question on discourse.. Curious if anyone on here has thoughts!”

It sounds like a homework!

See Transitive Closure of a graph.

Try to express your problem as a succession of matrix multiplications.

Then it will be easier to reason about parallelism.

Hi @Leo_I!

Main message

Not sure I can help with the parallelism aspect, but a golden rule of performance engineering in Julia is that you should start by improving the sequential code before writing a parallel version. To do that, the best source is the performance tips section of the manual. I tried to distill the essential advice, along with some guidance for profiling, in this notebook, so you can start there if you’d like.

Here are some possible performance pitfalls I see. Note however that I have not played with or even tried to understand your code:

  • dtype is a global variable, which usually hurts performance. You would obtain the same effect by removing it and redefining your method as follows, and it actually makes the function more generic:
function accumulate0!(J::Vector{Vector{Int}}, W::Vector{Vector{dtype}}) where {dtype}
  • There are lots of memory allocations in your code, which you can hunt down with profiling. Here are examples (even though some of them may be necessary, I don’t know):
    • the deepcopy function
    • the sortperm function
    • every time you index an array with another array or with [:]
    • some times when you perform dotted operations (this one is a bit tricky cause in other situations broadcasting with dots helps reduce allocations => read the docs about it)
    • the vcat function, especially with splatting ...

Side remark

Your question is very detailed, and the reproducible example is a good idea! But to make it easier for other people, it might help to format your code in a more Julian way. Using line breaks instead of semicolons would already improve readability in my opinion.
Docstrings are also a nice touch :slightly_smiling_face: Note that if you want the Julia REPL to recognize them, they have to go above the functions and not below as in Python. See the rules for writing documentation

3 Likes

I appreciate all the replies, though I am still hoping for some advice regarding paralellism.

@mrufsvold Thank you for the suggestion on slack!

@pitsianis I’ve graduated more than 10 years ago, homeworks are long gone. I’m doing this as a personal project to learn Julia, in an effort to bypass all the shortcomings of Mathematica and Python.

The problem is indeed finding transitive closure of a DAG, but with the added complexity that edges are weighted, and weights of paths with the same endpoints must be summed. I see on wiki there are several existing algorithms for unweighted DAGs, so I’ll read more on that topic, to see if they can be adjusted. Thank you for the suggestion on matrix multiplication!

@gdalle Appreciate your suggestions a lot! I did read the performance tips, but I’ll go through it again. Let me try to further improve my non-parallel solution, and I’ll post the results later.

Regarding docstrings, I really prefer the Python way, where documentation of a function is hidden inside a function (it is naturally a part of it), so when I wrap code, the function only takes up 1 line, as opposed to Julia, where it takes up at least 5 (3 for docstring and 2 for the function + end). Having many functions, even if wrapped, will force me to scroll way too much and be impractical. But that is just my preference, so if the julia community prefers it that way, I’ll write them like that : ).

It’s not only a good idea in general, it might also prove essential to reap the benefits of parallelism. I have often seen the case of sequential code so inefficient, for instance because of memory access, that it prevented the threads from working together in harmony.

1 Like

I understand your point, but it is not just a matter of esthetics. Putting docstrings before the object is how Julia understands them. For instance, it makes them accessible through the help mode in the interactive REPL, or through the documentation if you build it with Documenter.jl. See the example below

julia> function undocumented(x)
           """Docstring à la Python"""
           return x
       end;

help?> undocumented
search: undocumented

  No documentation found.

  undocumented is a Function.

  # 1 method for generic function "undocumented":
  [1] undocumented(x) in Main at /Users/guillaume/Downloads/scratchpad.jl:1


julia> """
           documented(x)
       
       Docstring à la Julia
       """
       function documented(x)
           return x
       end;

help?> documented
search: documented undocumented

  documented(x)


  Docstring à la Julia

This recent tutorial for hpc could help you out but all tutorials are going to tell you to first optimize your serial code.