# Parallel implementation of Transducers.jl

I’m developing a package for general-purpose simulator based on lazy evaluation using Transducers.jl.
(For the background, take a look at this.)

Now, I’m trying to implement Transducers.jl for parallel simulation.
I’d like to perform simulations with various initial conditions in a parallel sense.

The example can also be found in test/parallel.jl in package LazyFym.

## Illustration

• The way of obtaining data_single: perform simulation for a single scenario
• The way of obtaining data_parallel: perform simulation for multiple scenarios
• The way of obtaining data_parallel_distributed: perform simulation for multiple scenarios (supposed to be in a parallel sense)

I don’t understand why data_parallel_distributed is not properly obtained.
Furthermore, is there any clever idea to naturally extend the way of obtaining data_single to that of data_parallel?

using LazyFym
using Transducers

using Test
using LinearAlgebra

struct Env <: Fym
end

function ẋ(env::Env, x, t)
ẋ = -x
return ẋ
end

function initial_condition(env::Env)
return rand(3)
end

function postprocess(datum_raw)
_datum = Dict(:t => datum_raw.t, :x => datum_raw.x)
datum = (; zip(keys(_datum), values(_datum))...)
return datum
end

function terminal_condition(datum)
return norm(datum.x) < 1e-3
end

function parallel()
env = Env()
t0 = 0.0
Δt = 0.01
tf = 1.0
ts = t0:Δt:tf
num = 1000
# initial conditions
x0s = 1:num |> Map(i -> initial_condition(env)) |> collect
# simulator
trajs(x0, ts) = Sim(env, x0, ts, ẋ) |> TakeWhile(!terminal_condition)
# paralell simulation
@time data_single = trajs(x0s[1], ts) |> Map(postprocess) |> evaluate
@time data_parallel = foldxl(|>,
[x0s,
Map(x0 -> trajs(x0, ts)),
collect,
Map(_data -> _data |> Map(postprocess) |> evaluate),
collect])
@time data_parallel_distributed = foldxd(|>,
[x0s,
Map(x0 -> trajs(x0, ts)),
collect,
Map(_data -> _data |> Map(postprocess) |> evaluate),
collect])
@test data_single == data_parallel[1]
@test data_parallel == data_parallel_distributed
return data_single, data_parallel, data_parallel_distributed
end

data_single, data_parallel, data_parallel_distributed = parallel()
nothing

Well, I’ve been thinking about it and I guess I can use tcollect if I make a transducer that maps a container of initial conditions into a container of simulation results.

EDIT: I corrected my code using tcollect and dcollect as follows and it seems work.

(some part of the corrected code)

# simulator
trajs_evaluate(x0, ts) = Sim(env, x0, ts, ẋ) |> TakeWhile(!terminal_condition) |> Map(postprocess) |> evaluate
# paralell simulation
n = rand(num)
@time data_single = trajs_evaluate(x0s[n], ts)  # single scenario
@time data_multiple = x0s |> Map(x0 -> trajs_evaluate(x0, ts)) |> collect  # multiple scenarios
@time data_parallel_t = x0s |> Map(x0 -> trajs_evaluate(x0, ts)) |> tcollect  # multiple scenarios with thread-based parallel computing
@time data_parallel_d = x0s |> Map(x0 -> trajs_evaluate(x0, ts)) |> dcollect  # multiple scenarios with process-based parallel computing
@test data_single == data_multiple[n]
@test data_multiple == data_parallel_t == data_parallel_d