Performance issues with parallel Julia code

I am working on a scientific code that is experiencing issues with parallelization.
The parallel version is slower than the serial one and I am not sure if the right approaches are used for this application.

How can I improve the performance of the parallel calculation?

Is the right approach being used or should other packages / functions be considered for parallelization?

I have already tried a larger workload, however this makes no difference.
I suspect the problem is somehow due to data movement between workers, but I don’t know how to check or improve this one.
Parallel programming with Julia is new for me, so I am very grateful for any help!

The simulation code is something of a benchmark for the Julia programming language, as our team is considering using Julia for all future projects if strong performance advantages to the current workflow can be demonstrated.
Because of this, I would like to maximize performance, also since calculations with very large models as well as possible use on a cluster are planned.


Minimum Working Example

The critical parts of the code can be broken down to the following example.

I start the process as follows:

using Distributed
addprocs();
@everywhere using SharedArrays, LinearAlgebra, Test

First I define the simulation model, containing all data used for the calculations.
Is it okay to store SharedArrays with other data in a struct or should a different approach be used?

@everywhere struct Model
    idx::Vector{Tuple{Int,Int}} # indices
    A::SharedMatrix{Float64} # results, will be constantly updated
    B::Vector{Float64} # part of pre-processing, will only be read
end

See the non-parallel version of the function used for the update of the model below.

function update(m::Model, factor::Float64)
    L::Float64 = 0.
    k::Float64 = 0.
    cnt::Int = 0
    for (i,j) in m.idx
        cnt+=1
        L = norm(m.A[:,i]-m.A[:,j])
        k = factor * m.B[cnt]
        m.A[:,i] .+= k*L
        m.A[:,j] .-= k*L
    end
end

For parallelization, I simply tried the following. Is perhaps an approach with pmap better in this case?

@everywhere function parallel_update(m::Model, factor::Float64)
    L::Float64 = 0.
    k::Float64 = 0.
    cnt::Int = 0
    @sync @distributed for (i,j) in m.idx
        cnt+=1
        L = norm(m.A[:,i]-m.A[:,j])
        k = factor * m.B[cnt]
        m.A[:,i] .+= k*L
        m.A[:,j] .-= k*L
    end
end

To test the results I use the following function:

@everywhere function test_my_code()
    # provide some data
    n = 10000000
    idx = [(rand(1:n),rand(1:n)) for k=1:n]
    A = SharedArray(hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...))
    B = [rand(0.:1000.) for k=1:n]

    # define models
    model1 = Model(idx,A,B)
    model2 = Model(idx,A,B)

    # test and compare results
    @time update(model1,2.)
    @time parallel_update(model2,2.)
    @test model1 == model2
end
julia> test_my_code() # first run
  6.350694 seconds (50.00 M allocations: 5.215 GiB, 13.66% gc time)
 11.422999 seconds (6.69 k allocations: 446.156 KiB)
Test Passed

julia> test_my_code() # second run
  6.286828 seconds (50.00 M allocations: 5.215 GiB, 18.35% gc time)
  6.297144 seconds (2.92 k allocations: 143.516 KiB)
Test Passed

Note: significant performance improvements for the serial code

I was able to significantly improve the performance of the serial function and reduce the number of allocations to zero.
Since this seems to make no difference to the parallelization problem, I used the shorter, easier-to-read version for the previous example.
See the serial code below.

using LinearAlgebra, Test

struct Model
    idx::Vector{Tuple{Int,Int}}
    A::Matrix{Float64}
    B::Vector{Float64}
end

function update(m::Model, factor::Float64)
    L::Float64 = 0.
    k::Float64 = 0.
    cnt::Int = 0
    for (i,j) in m.idx
        cnt+=1
        L = norm(m.A[:,i]-m.A[:,j])
        k = factor * m.B[cnt]
        m.A[:,i] .+= k*L
        m.A[:,j] .-= k*L
    end
end

function update_fast(m::Model, factor::Float64)
    L::Float64 = 0.
    k::Float64 = 0.
    cnt::Int = 0
    for (i,j) in m.idx
        cnt+=1
        L = sqrt((m.A[1,i]-m.A[1,j])^2 +
                 (m.A[2,i]-m.A[2,j])^2 +
                 (m.A[3,i]-m.A[3,j])^2)
        k = factor * m.B[cnt]
        m.A[1,i] += k*L
        m.A[2,i] += k*L
        m.A[3,i] += k*L
        m.A[1,j] -= k*L
        m.A[2,j] -= k*L
        m.A[3,j] -= k*L
    end
end

function test_serial_speedup()
    n = 10000000
    idx = [(rand(1:n),rand(1:n)) for k=1:n]
    A = hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...)
    B = [rand(0.:1000.) for k=1:n]
    model1 = Model(idx,A,B)
    model2 = Model(idx,A,B)
    @time update(model1,2.)
    @time update_fast(model2,2.)
    @test model1 == model2
end
julia> test_serial_speedup()
  5.008049 seconds (50.00 M allocations: 5.215 GiB, 18.14% gc time)
  0.464986 seconds
Test Passed

First of all, I have to say: Distributed looks impressive!

Collecting and reworking your MWE

using Distributed
addprocs(5);
@everywhere using SharedArrays, LinearAlgebra, Test, BenchmarkTools

@everywhere abstract type AbstractModel end

struct Model <: AbstractModel
    idx::Vector{Tuple{Int,Int}}
    A::Matrix{Float64}
    B::Vector{Float64}
end

@everywhere struct DistributedModel <: AbstractModel
    idx::Vector{Tuple{Int,Int}} # indices
    A::SharedMatrix{Float64} # results, will be constantly updated
    B::Vector{Float64} # part of pre-processing, will only be read
end

function update1(m::AbstractModel, factor::Float64)
    for (cnt,(i,j)) in enumerate(m.idx)
        L = norm((@view m.A[:,i]).-(@view m.A[:,j]))
        k = factor * m.B[cnt]
        (@view m.A[:,i]) .+= k*L
        (@view m.A[:,j]) .-= k*L
    end
end

@everywhere function parallel_update1(m::AbstractModel, factor::Float64)
    @sync @distributed for cnt in 1:length(m.idx)
        i,j = m.idx[cnt]
        L = norm((@view m.A[:,i]).-(@view m.A[:,j]))
        k = factor * m.B[cnt]
        (@view m.A[:,i]) .+= k*L
        (@view m.A[:,j]) .-= k*L
    end
end

function update2(m::AbstractModel, factor::Float64)
    for (cnt,(i,j)) in enumerate(m.idx)
        L = sqrt((m.A[1,i]-m.A[1,j])^2 +
                 (m.A[2,i]-m.A[2,j])^2 +
                 (m.A[3,i]-m.A[3,j])^2)
        k = factor * m.B[cnt]
        m.A[1,i] += k*L
        m.A[2,i] += k*L
        m.A[3,i] += k*L
        m.A[1,j] -= k*L
        m.A[2,j] -= k*L
        m.A[3,j] -= k*L
    end
end

@everywhere function parallel_update2(m::AbstractModel, factor::Float64)
    @sync @distributed for cnt in 1:length(m.idx)
        i,j = m.idx[cnt]
        L = sqrt((m.A[1,i]-m.A[1,j])^2 +
                 (m.A[2,i]-m.A[2,j])^2 +
                 (m.A[3,i]-m.A[3,j])^2)
        k = factor * m.B[cnt]
        m.A[1,i] += k*L
        m.A[2,i] += k*L
        m.A[3,i] += k*L
        m.A[1,j] -= k*L
        m.A[2,j] -= k*L
        m.A[3,j] -= k*L
    end
end

function test_serial()
    n = 10000000
    idx = [(rand(1:n),rand(1:n)) for k=1:n]
    A = hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...)
    B = [rand(0.:1000.) for k=1:n]
    model1 = Model(idx,A,B)
    model2 = Model(idx,A,B)
    @btime update1($model1,2.)
    @btime update2($model2,2.)
    @test model1 == model2
end

function test_parallel()
    # provide some data
    n = 10000000
    idx = [(rand(1:n),rand(1:n)) for k=1:n]
    A = SharedArray(hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...))
    B = [rand(0.:1000.) for k=1:n]

    # define models
    model1 = DistributedModel(idx,A,B)
    model2 = DistributedModel(idx,A,B)

    # test and compare results
    @btime parallel_update1($model1,2.)
    @btime parallel_update2($model2,2.)
    @test model1 == model2
end

test_serial()
test_parallel()

I see on Julia 1.6.3

  2.051 s (10000000 allocations: 1.04 GiB)
  743.916 ms (0 allocations: 0 bytes)
  1.137 s (810 allocations: 37.92 KiB)
  829.433 ms (810 allocations: 37.92 KiB)

so we see some speedup, but the problem size is a bit small for distribution I assume.

One more remark: your original code

    L::Float64 = 0.
    k::Float64 = 0.
    cnt::Int = 0
    @sync @distributed for (i,j) in m.idx
        cnt+=1
        L = norm(m.A[:,i]-m.A[:,j])
        k = factor * m.B[cnt]
        m.A[:,i] .+= k*L
        m.A[:,j] .-= k*L
    end

writes the variables L, k and cnt outside the distributed loop. This is not supported and would mean a data race.

1 Like

Thank you very much @goerch for your detailed comments and helpful information, which helped me to improve the performance significantly!
However, after further research, I decided to also give multithreading a try, as it seems to be better suited for this use case. A slightly better simulation-adapted MWE looks like the following.

MWE Multithreading
using Base.Threads, LinearAlgebra, Test, BenchmarkTools

abstract type AbstractModel end

struct Model <: AbstractModel
    idx::Vector{Tuple{Int,Int}}
    A::Matrix{Float64}
    B::Vector{Float64}
end

struct DistributedModel <: AbstractModel
    idx::Vector{Tuple{Int,Int}} # indices
    idxdist::Vector{UnitRange{Int}} # range of indices for each thread
    A::Matrix{Float64} # updated results
    Aw::Array{Float64,3} # results, each thread gets his own Aw[:,:,threadid()]
    B::Vector{Float64} # part of pre-processing, will only be read
end

function update2(m::AbstractModel, factor::Float64)
    for (i,j) in m.idx
        L = sqrt((m.A[1,i]-m.A[1,j])^2 +
                 (m.A[2,i]-m.A[2,j])^2 +
                 (m.A[3,i]-m.A[3,j])^2)
        k = factor * m.B[i]
        m.A[1,i] += k*L
        m.A[2,i] += k*L
        m.A[3,i] += k*L
        m.A[1,j] -= k*L
        m.A[2,j] -= k*L
        m.A[3,j] -= k*L
    end
end

function threads_update2(m::AbstractModel, factor::Float64)
    @threads for _ in 1:nthreads()
        tid = threadid()
        for cnt in m.idxdist[tid]
            i,j = m.idx[cnt]
            L = sqrt((m.A[1,i]-m.A[1,j])^2 +
                     (m.A[2,i]-m.A[2,j])^2 +
                     (m.A[3,i]-m.A[3,j])^2)
            k = factor * m.B[i]
            m.Aw[1,i,tid] += k*L
            m.Aw[2,i,tid] += k*L
            m.Aw[3,i,tid] += k*L
            m.Aw[1,j,tid] -= k*L
            m.Aw[2,j,tid] -= k*L
            m.Aw[3,j,tid] -= k*L
        end
    end
    @threads for i in 1:size(m.A,2)
        m.A[1,i] = sum(@view m.Aw[1,i,:])
        m.A[2,i] = sum(@view m.Aw[2,i,:])
        m.A[3,i] = sum(@view m.Aw[3,i,:])
    end
end

function defaultdist(sz::Int, nc::Int)
    if sz >= nc
        chunk_size = div(sz,nc)
        remainder = rem(sz,nc)
        sidx = zeros(Int64, nc+1)
        for i = 1:(nc+1)
            sidx[i] += (i-1)*chunk_size + 1
            if i<= remainder
                sidx[i] += i-1
            else
                sidx[i] += remainder
            end
        end
        grid = fill(0:0,nc)
        for i = 1:nc
            grid[i] = sidx[i]:sidx[i+1]-1
        end
        return grid
    else
        sidx = [1:(sz+1);]
        grid = fill(0:0,nc)
        for i = 1:sz
            grid[i] = sidx[i]:sidx[i+1]-1
        end
        return grid
    end
end

function test_parallel()
    n = 100_000_000
    N = 1000
    idx = [(rand(1:N),rand(1:N)) for k=1:n]
    idxdist = defaultdist(n,nthreads())
    A = hcat(([rand(-1000.:1000.);rand(-1000.:1000.);rand(-1000.:1000.)] for k=1:N)...)
    Aw = zeros((3,N,nthreads()))
    @threads for i = 1:nthreads()
        Aw[:,:,i] .= A
    end
    B = [rand(-1000.:1000.) for k=1:N]

    model1 = Model(idx,A,B)
    model2 = DistributedModel(idx,idxdist,A,Aw,B)

    @btime update2($model1,1e-9)
    @btime threads_update2($model2,1e-9)
    @test model1.A ≈ model2.A
end
test_parallel()

results (Julia 1.6.3, 8 threads on Intel i9 MacBook Pro)

  480.496 ms (0 allocations: 0 bytes)
  108.942 ms (82 allocations: 6.36 KiB)
Test Passed

A few comments:

  • The defaultdist function distributes the indices across all threads. It is an extended function of the version used in the DistributedArrays package.
  • To prevent a race condition, the data is written to an array Aw (third dimension for each thread) and then later summed up in A.
  • With 8 threads, I now get a speed increase of about factor 4.4 for the MWE. With my simulation code, only factor ~3.5 is achieved, since there is also a lot of non-parallelized file export.
  • Even when varying the workload on my local machine, with multithreading I was able to produce a better performance increase compared to the version with SharedArrays and Distributed.
  • In the coming days I will start simulations on a cluster and am already curious if and how the scaling changes. I will also try the Distributed version to see how it scales with higher workloads.

For any further comments and tips I am very grateful!