Persistent local data structures in parallel programs


#1

I am trying to implement a parallel problem slightly more involved than a simple for loop. Namely, I am trying to implement a simple pseudo-spectral solver for an integro-differential equation which features a complicated and expensive-to-compute convolution kernel. The computation of that kernel should be parallelised using SharedArrays for now but possibly DistributedArrays at some later point.

It is hard to create a true minimal example, as the structure (and the various layers) of the code seems to be of importance. The simplest implementation I could come up with is attached. It works as follows:

  • one calls run_simulation() which creates all important data structures and loops over a number of time steps
  • in each timestep!() the vector field (right-hand side) of the equation is computed and the solution is updated
  • the vector field collision_operator!() has a cheap part which is computed serially and an expensive part convolution_kernel!() which is supposed to be computed in parallel

The problem is, that the parallel part also needs some other (constant) data structures (like a grid object and a user-specified function), and it needs to create some local arrays which should be persistent for efficiency.

How to efficiently implement the last two points is not clear to me. I implemented various ways of initiating the parallel loop which all perform worse than the serial version due to unwanted copying of data:

# serial
  0.007511 seconds (2.11 k allocations: 114.603 KiB)

# @parallel
  0.018689 seconds (14.51 k allocations: 633.516 KiB)

# @spawnat
  0.020277 seconds (13.72 k allocations: 647.420 KiB)

# @async remotecall_wait
  0.018142 seconds (12.46 k allocations: 884.404 KiB)

The problem is, I guess, that e.g. the grid object gets copied in each spawn of a process on a parallel worker. As this object is constant, it would be better to instantiate it everywhere and just use the local version.

For the creation of local temporary arrays, I am using a generated function. I am not sure if this is the best way. Having another local data structure for this might be better.

From the documentation it is not quite clear to me how to implement this efficiently. Most examples in the documentation are rather simple. So any help would be appreciated.


if haskey(ENV, "JULIA_NUM_THREADS")
    nw = parse(Int, ENV["JULIA_NUM_THREADS"])
    addprocs(nw)
else
    warn("Running in serial mode.")
end


const nt = 10
const nx = 16
const ny = 8
const Δt = 1E-1


@everywhere module CollisionOperator

    export run_simulation


    struct Grid2d{M,N,T}
        x::Vector{T}
        y::Vector{T}
    end

    function Grid2d(M, N; x1=0, x2=2π, y1=0, y2=2π)
        x = [x1 + i*(x2-x1)/M for i in 0:M-1]
        y = [y1 + j*(y2-y1)/N for j in 0:N-1]
        Grid2d{M,N,eltype(x)}(x, y)
    end


    struct RungeKutta{M,N,T}
        u::Vector{SharedArray{T,2}}
        f::Vector{SharedArray{T,2}}

        Δt::T

        a::Vector{T}
        b::Vector{T}
        c::Vector{T}
    end

    function RungeKutta(M, N, Δt::T) where {T}
        u = [SharedArray{T}((M,N)), SharedArray{T}((M,N)), SharedArray{T}((M,N))]
        f = [SharedArray{T}((M,N)), SharedArray{T}((M,N)), SharedArray{T}((M,N))]

        a = [0.0, 0.5, 1.0]
        b = [1/6, 2/3, 1/6]
        c = [0.0, 0.5, 1.0]

        RungeKutta{M,N,T}(u, f, Δt, a, b, c)
    end


    function run_simulation(nt::Int, nx::Int, ny::Int, Δt::Number, uinit::Function, mfunc::Function)
        grid = Grid2d(nx,ny)
        rk   = RungeKutta(nx, ny, Δt)

        u₀ = zeros(nx,ny)
        u₁ = zeros(nx,ny)

        # compute initial conditions
        for j in 1:size(u₀,2)
            for i in 1:size(u₀,1)
                 u₀[i,j]  = uinit(grid.x[i], grid.y[j])
            end
        end

        # run for nt time steps
        for n in 1:nt
            timestep!(grid, rk, mfunc, u₀, u₁)
            # ...
            # save solution
            # ...
            u₀ .= u₁
        end
    end


    "Runge-Kutta time stepping algorithm solving u'=f(u)"
    function timestep!(grid::Grid2d{M,N,T}, rk::RungeKutta{M,N,T}, mfunc::Function, u₀::Matrix{T}, u₁::Matrix{T}) where {M,N,T}
        @assert size(u₀) == size(u₁) == (M,N)

        # compute intermediate steps
        rk.u[1] .= u₀
        collision_operator!(grid, mfunc, rk.u[1], rk.f[1])

        for i in 2:length(rk.a)
            rk.u[i] .= u₀ .+ rk.Δt .* rk.a[i] .* rk.f[i-1]
            collision_operator!(grid, mfunc, rk.u[i], rk.f[i])
        end

        # compute final solution
        u₁ .= u₀
        for i in eachindex(rk.b, rk.f)
            u₁ .+= rk.Δt .* rk.b[i] .* rk.f[i]
        end
    end


    "Collison operator: computes the vector field f(u) where f is a complicated integro-differential operator"
    function collision_operator!(grid::Grid2d{M,N,T}, m::Function, u::SharedArray{T,2}, f::SharedArray{T,2}) where {M,N,T}
        # ...
        # do something not too expensive
        # ...

        # compute expensive part in parallel
        if nworkers() > 1
            wn = div(N, nworkers())

            @sync @parallel for j = 1:N
                convolution_kernel!(grid, j, j, m, u, f)
            end

            # @sync for (p,w) in enumerate(workers())
            #     j1 = wn*(p-1) + 1
            #     j2 = wn*p
            #     @spawnat w begin
            #         convolution_kernel!(grid, j1, j2, m, u, f)
            #     end
            # end

            # @sync for (p,w) in enumerate(workers())
            #     j1 = wn*(p-1) + 1
            #     j2 = wn*p
            #     @async remotecall_wait(convolution_kernel!, w, grid, j1, j2, m, u, f)
            # end

        else
            convolution_kernel!(grid, 1, N, m, u, f)
        end
    end


    "Convolution kernel: most expensive part in vector field computation"
    @generated function convolution_kernel!(grid::Grid2d{M,N,T}, j1::Int, j2::Int, m::Function, u::SharedArray{T,2}, f::SharedArray{T,2}) where {M,N,T}
        # ...
        # create some local arrays
        # ...

        quote
            for j in j1:j2
                for i in 1:M
                    # ...
                    # do some expensive computation involving the function m()
                    # ...
                end
            end
        end
    end

end


# function that is used in the convolution kernel
@everywhere function m_func!(u::Array{T,2}, m::Union{Array{T,2},SharedArray{T,2}}) where {T}
    m .= u
end

# function that sets initial conditions
@everywhere function u_init(x,y)
    sin(x)^4 * sin(y)^4
end

# load CollisionOperator module
using CollisionOperator

# warmup
run_simulation(nt, nx, ny, Δt, u_init, m_func!)

# timing
@time run_simulation(nt, nx, ny, Δt, u_init, m_func!)