 # Performance issues with parallel Julia code

I am working on a scientific code that is experiencing issues with parallelization.
The parallel version is slower than the serial one and I am not sure if the right approaches are used for this application.

How can I improve the performance of the parallel calculation?

Is the right approach being used or should other packages / functions be considered for parallelization?

I have already tried a larger workload, however this makes no difference.
I suspect the problem is somehow due to data movement between workers, but I don’t know how to check or improve this one.
Parallel programming with Julia is new for me, so I am very grateful for any help!

The simulation code is something of a benchmark for the Julia programming language, as our team is considering using Julia for all future projects if strong performance advantages to the current workflow can be demonstrated.
Because of this, I would like to maximize performance, also since calculations with very large models as well as possible use on a cluster are planned.

## Minimum Working Example

The critical parts of the code can be broken down to the following example.

I start the process as follows:

``````using Distributed
@everywhere using SharedArrays, LinearAlgebra, Test
``````

First I define the simulation model, containing all data used for the calculations.
Is it okay to store SharedArrays with other data in a struct or should a different approach be used?

``````@everywhere struct Model
idx::Vector{Tuple{Int,Int}} # indices
A::SharedMatrix{Float64} # results, will be constantly updated
B::Vector{Float64} # part of pre-processing, will only be read
end
``````

See the non-parallel version of the function used for the update of the model below.

``````function update(m::Model, factor::Float64)
L::Float64 = 0.
k::Float64 = 0.
cnt::Int = 0
for (i,j) in m.idx
cnt+=1
L = norm(m.A[:,i]-m.A[:,j])
k = factor * m.B[cnt]
m.A[:,i] .+= k*L
m.A[:,j] .-= k*L
end
end
``````

For parallelization, I simply tried the following. Is perhaps an approach with `pmap` better in this case?

``````@everywhere function parallel_update(m::Model, factor::Float64)
L::Float64 = 0.
k::Float64 = 0.
cnt::Int = 0
@sync @distributed for (i,j) in m.idx
cnt+=1
L = norm(m.A[:,i]-m.A[:,j])
k = factor * m.B[cnt]
m.A[:,i] .+= k*L
m.A[:,j] .-= k*L
end
end
``````

To test the results I use the following function:

``````@everywhere function test_my_code()
# provide some data
n = 10000000
idx = [(rand(1:n),rand(1:n)) for k=1:n]
A = SharedArray(hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...))
B = [rand(0.:1000.) for k=1:n]

# define models
model1 = Model(idx,A,B)
model2 = Model(idx,A,B)

# test and compare results
@time update(model1,2.)
@time parallel_update(model2,2.)
@test model1 == model2
end
``````
``````julia> test_my_code() # first run
6.350694 seconds (50.00 M allocations: 5.215 GiB, 13.66% gc time)
11.422999 seconds (6.69 k allocations: 446.156 KiB)
Test Passed

julia> test_my_code() # second run
6.286828 seconds (50.00 M allocations: 5.215 GiB, 18.35% gc time)
6.297144 seconds (2.92 k allocations: 143.516 KiB)
Test Passed
``````

## Note: significant performance improvements for the serial code

I was able to significantly improve the performance of the serial function and reduce the number of allocations to zero.
Since this seems to make no difference to the parallelization problem, I used the shorter, easier-to-read version for the previous example.
See the serial code below.

``````using LinearAlgebra, Test

struct Model
idx::Vector{Tuple{Int,Int}}
A::Matrix{Float64}
B::Vector{Float64}
end

function update(m::Model, factor::Float64)
L::Float64 = 0.
k::Float64 = 0.
cnt::Int = 0
for (i,j) in m.idx
cnt+=1
L = norm(m.A[:,i]-m.A[:,j])
k = factor * m.B[cnt]
m.A[:,i] .+= k*L
m.A[:,j] .-= k*L
end
end

function update_fast(m::Model, factor::Float64)
L::Float64 = 0.
k::Float64 = 0.
cnt::Int = 0
for (i,j) in m.idx
cnt+=1
L = sqrt((m.A[1,i]-m.A[1,j])^2 +
(m.A[2,i]-m.A[2,j])^2 +
(m.A[3,i]-m.A[3,j])^2)
k = factor * m.B[cnt]
m.A[1,i] += k*L
m.A[2,i] += k*L
m.A[3,i] += k*L
m.A[1,j] -= k*L
m.A[2,j] -= k*L
m.A[3,j] -= k*L
end
end

function test_serial_speedup()
n = 10000000
idx = [(rand(1:n),rand(1:n)) for k=1:n]
A = hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...)
B = [rand(0.:1000.) for k=1:n]
model1 = Model(idx,A,B)
model2 = Model(idx,A,B)
@time update(model1,2.)
@time update_fast(model2,2.)
@test model1 == model2
end
``````
``````julia> test_serial_speedup()
5.008049 seconds (50.00 M allocations: 5.215 GiB, 18.14% gc time)
0.464986 seconds
Test Passed
``````

First of all, I have to say: Distributed looks impressive!

``````using Distributed
@everywhere using SharedArrays, LinearAlgebra, Test, BenchmarkTools

@everywhere abstract type AbstractModel end

struct Model <: AbstractModel
idx::Vector{Tuple{Int,Int}}
A::Matrix{Float64}
B::Vector{Float64}
end

@everywhere struct DistributedModel <: AbstractModel
idx::Vector{Tuple{Int,Int}} # indices
A::SharedMatrix{Float64} # results, will be constantly updated
B::Vector{Float64} # part of pre-processing, will only be read
end

function update1(m::AbstractModel, factor::Float64)
for (cnt,(i,j)) in enumerate(m.idx)
L = norm((@view m.A[:,i]).-(@view m.A[:,j]))
k = factor * m.B[cnt]
(@view m.A[:,i]) .+= k*L
(@view m.A[:,j]) .-= k*L
end
end

@everywhere function parallel_update1(m::AbstractModel, factor::Float64)
@sync @distributed for cnt in 1:length(m.idx)
i,j = m.idx[cnt]
L = norm((@view m.A[:,i]).-(@view m.A[:,j]))
k = factor * m.B[cnt]
(@view m.A[:,i]) .+= k*L
(@view m.A[:,j]) .-= k*L
end
end

function update2(m::AbstractModel, factor::Float64)
for (cnt,(i,j)) in enumerate(m.idx)
L = sqrt((m.A[1,i]-m.A[1,j])^2 +
(m.A[2,i]-m.A[2,j])^2 +
(m.A[3,i]-m.A[3,j])^2)
k = factor * m.B[cnt]
m.A[1,i] += k*L
m.A[2,i] += k*L
m.A[3,i] += k*L
m.A[1,j] -= k*L
m.A[2,j] -= k*L
m.A[3,j] -= k*L
end
end

@everywhere function parallel_update2(m::AbstractModel, factor::Float64)
@sync @distributed for cnt in 1:length(m.idx)
i,j = m.idx[cnt]
L = sqrt((m.A[1,i]-m.A[1,j])^2 +
(m.A[2,i]-m.A[2,j])^2 +
(m.A[3,i]-m.A[3,j])^2)
k = factor * m.B[cnt]
m.A[1,i] += k*L
m.A[2,i] += k*L
m.A[3,i] += k*L
m.A[1,j] -= k*L
m.A[2,j] -= k*L
m.A[3,j] -= k*L
end
end

function test_serial()
n = 10000000
idx = [(rand(1:n),rand(1:n)) for k=1:n]
A = hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...)
B = [rand(0.:1000.) for k=1:n]
model1 = Model(idx,A,B)
model2 = Model(idx,A,B)
@btime update1(\$model1,2.)
@btime update2(\$model2,2.)
@test model1 == model2
end

function test_parallel()
# provide some data
n = 10000000
idx = [(rand(1:n),rand(1:n)) for k=1:n]
A = SharedArray(hcat(([rand(0.:1000.);rand(0.:1000.);rand(0.:1000.)] for k=1:n)...))
B = [rand(0.:1000.) for k=1:n]

# define models
model1 = DistributedModel(idx,A,B)
model2 = DistributedModel(idx,A,B)

# test and compare results
@btime parallel_update1(\$model1,2.)
@btime parallel_update2(\$model2,2.)
@test model1 == model2
end

test_serial()
test_parallel()
``````

I see on Julia 1.6.3

``````  2.051 s (10000000 allocations: 1.04 GiB)
743.916 ms (0 allocations: 0 bytes)
1.137 s (810 allocations: 37.92 KiB)
829.433 ms (810 allocations: 37.92 KiB)
``````

so we see some speedup, but the problem size is a bit small for distribution I assume.

One more remark: your original code

``````    L::Float64 = 0.
k::Float64 = 0.
cnt::Int = 0
@sync @distributed for (i,j) in m.idx
cnt+=1
L = norm(m.A[:,i]-m.A[:,j])
k = factor * m.B[cnt]
m.A[:,i] .+= k*L
m.A[:,j] .-= k*L
end
``````

writes the variables L, k and cnt outside the distributed loop. This is not supported and would mean a data race.

1 Like

Thank you very much @goerch for your detailed comments and helpful information, which helped me to improve the performance significantly!
However, after further research, I decided to also give multithreading a try, as it seems to be better suited for this use case. A slightly better simulation-adapted MWE looks like the following.

``````using Base.Threads, LinearAlgebra, Test, BenchmarkTools

abstract type AbstractModel end

struct Model <: AbstractModel
idx::Vector{Tuple{Int,Int}}
A::Matrix{Float64}
B::Vector{Float64}
end

struct DistributedModel <: AbstractModel
idx::Vector{Tuple{Int,Int}} # indices
idxdist::Vector{UnitRange{Int}} # range of indices for each thread
A::Matrix{Float64} # updated results
B::Vector{Float64} # part of pre-processing, will only be read
end

function update2(m::AbstractModel, factor::Float64)
for (i,j) in m.idx
L = sqrt((m.A[1,i]-m.A[1,j])^2 +
(m.A[2,i]-m.A[2,j])^2 +
(m.A[3,i]-m.A[3,j])^2)
k = factor * m.B[i]
m.A[1,i] += k*L
m.A[2,i] += k*L
m.A[3,i] += k*L
m.A[1,j] -= k*L
m.A[2,j] -= k*L
m.A[3,j] -= k*L
end
end

for cnt in m.idxdist[tid]
i,j = m.idx[cnt]
L = sqrt((m.A[1,i]-m.A[1,j])^2 +
(m.A[2,i]-m.A[2,j])^2 +
(m.A[3,i]-m.A[3,j])^2)
k = factor * m.B[i]
m.Aw[1,i,tid] += k*L
m.Aw[2,i,tid] += k*L
m.Aw[3,i,tid] += k*L
m.Aw[1,j,tid] -= k*L
m.Aw[2,j,tid] -= k*L
m.Aw[3,j,tid] -= k*L
end
end
m.A[1,i] = sum(@view m.Aw[1,i,:])
m.A[2,i] = sum(@view m.Aw[2,i,:])
m.A[3,i] = sum(@view m.Aw[3,i,:])
end
end

function defaultdist(sz::Int, nc::Int)
if sz >= nc
chunk_size = div(sz,nc)
remainder = rem(sz,nc)
sidx = zeros(Int64, nc+1)
for i = 1:(nc+1)
sidx[i] += (i-1)*chunk_size + 1
if i<= remainder
sidx[i] += i-1
else
sidx[i] += remainder
end
end
grid = fill(0:0,nc)
for i = 1:nc
grid[i] = sidx[i]:sidx[i+1]-1
end
return grid
else
sidx = [1:(sz+1);]
grid = fill(0:0,nc)
for i = 1:sz
grid[i] = sidx[i]:sidx[i+1]-1
end
return grid
end
end

function test_parallel()
n = 100_000_000
N = 1000
idx = [(rand(1:N),rand(1:N)) for k=1:n]
A = hcat(([rand(-1000.:1000.);rand(-1000.:1000.);rand(-1000.:1000.)] for k=1:N)...)
Aw[:,:,i] .= A
end
B = [rand(-1000.:1000.) for k=1:N]

model1 = Model(idx,A,B)
model2 = DistributedModel(idx,idxdist,A,Aw,B)

@btime update2(\$model1,1e-9)
@test model1.A ≈ model2.A
end
test_parallel()

``````

#### results (Julia 1.6.3, 8 threads on Intel i9 MacBook Pro)

``````  480.496 ms (0 allocations: 0 bytes)
108.942 ms (82 allocations: 6.36 KiB)
Test Passed
``````

• The `defaultdist` function distributes the indices across all threads. It is an extended function of the version used in the `DistributedArrays` package.
• To prevent a race condition, the data is written to an array `Aw` (third dimension for each thread) and then later summed up in `A`.
• Even when varying the workload on my local machine, with multithreading I was able to produce a better performance increase compared to the version with `SharedArrays` and `Distributed`.
• In the coming days I will start simulations on a cluster and am already curious if and how the scaling changes. I will also try the `Distributed` version to see how it scales with higher workloads.