Parallel implementation of Transducers.jl

I’m developing a package for general-purpose simulator based on lazy evaluation using Transducers.jl.
(For the background, take a look at this.)

Now, I’m trying to implement Transducers.jl for parallel simulation.
I’d like to perform simulations with various initial conditions in a parallel sense.

The example can also be found in test/parallel.jl in package LazyFym.

Illustration

  • The way of obtaining data_single: perform simulation for a single scenario
  • The way of obtaining data_parallel: perform simulation for multiple scenarios
  • The way of obtaining data_parallel_distributed: perform simulation for multiple scenarios (supposed to be in a parallel sense)

I don’t understand why data_parallel_distributed is not properly obtained.
Furthermore, is there any clever idea to naturally extend the way of obtaining data_single to that of data_parallel?

using LazyFym
using Transducers

using Test
using LinearAlgebra


struct Env <: Fym
end

function ẋ(env::Env, x, t)
    ẋ = -x
    return ẋ
end

function initial_condition(env::Env)
    return rand(3)
end

function postprocess(datum_raw)
    _datum = Dict(:t => datum_raw.t, :x => datum_raw.x)
    datum = (; zip(keys(_datum), values(_datum))...)
    return datum
end

function terminal_condition(datum)
    return norm(datum.x) < 1e-3
end

function parallel()
    env = Env()
    t0 = 0.0
    Δt = 0.01
    tf = 1.0
    ts = t0:Δt:tf
    num = 1000
    # initial conditions
    x0s = 1:num |> Map(i -> initial_condition(env)) |> collect
    # simulator
    trajs(x0, ts) = Sim(env, x0, ts, ẋ) |> TakeWhile(!terminal_condition)
    # paralell simulation
    @time data_single = trajs(x0s[1], ts) |> Map(postprocess) |> evaluate
    @time data_parallel = foldxl(|>,
                                 [x0s,
                                  Map(x0 -> trajs(x0, ts)),
                                  collect,
                                  Map(_data -> _data |> Map(postprocess) |> evaluate),
                                  collect])
    @time data_parallel_distributed = foldxd(|>,
                                             [x0s,
                                              Map(x0 -> trajs(x0, ts)),
                                              collect,
                                              Map(_data -> _data |> Map(postprocess) |> evaluate),
                                              collect])
    @test data_single == data_parallel[1]
    @test data_parallel == data_parallel_distributed
    return data_single, data_parallel, data_parallel_distributed
end

data_single, data_parallel, data_parallel_distributed = parallel()
nothing

Well, I’ve been thinking about it and I guess I can use tcollect if I make a transducer that maps a container of initial conditions into a container of simulation results.

EDIT: I corrected my code using tcollect and dcollect as follows and it seems work.

(some part of the corrected code)

    # simulator
    trajs_evaluate(x0, ts) = Sim(env, x0, ts, ẋ) |> TakeWhile(!terminal_condition) |> Map(postprocess) |> evaluate
    # paralell simulation
    n = rand(num)
    @time data_single = trajs_evaluate(x0s[n], ts)  # single scenario
    @time data_multiple = x0s |> Map(x0 -> trajs_evaluate(x0, ts)) |> collect  # multiple scenarios
    @time data_parallel_t = x0s |> Map(x0 -> trajs_evaluate(x0, ts)) |> tcollect  # multiple scenarios with thread-based parallel computing
    @time data_parallel_d = x0s |> Map(x0 -> trajs_evaluate(x0, ts)) |> dcollect  # multiple scenarios with process-based parallel computing
    @test data_single == data_multiple[n]
    @test data_multiple == data_parallel_t == data_parallel_d