Calling pmap inside of a loop with varying parameters

parallel

#1

I am trying to do a parameter study on an ODE type problem, varying both a parameter in the equation, and the choice of time step. The problem lays out like:

function v(x,k)
   return  cos(k * x)
end

Dt_vals = logspace(-4,1,11);
k_vals = [1,2,4];
x0 = 0.0;

for k in k_vals
    output = pmap(Dt->solve_ode(x0, v, Dt), Dt_vals);
   # process output
end

Here, solve_ode integrates an ode of the sort x' = v(x; k) using a particular integrator. I load workers with addprocs, but I get errors about, for instance, k not being defined on the workers.


#2

Your code as written above doesn’t pass k to the integrator anywhere, which is why you’re getting that error message. You also need to explicitly define functions (and any other global constants) on all processes using the @everywhere macro.

To test a bunch of different parameter combinations, I’d define a wrapper function run that takes the parameters as a tuple, and then use IterTools.product to iterate over all the combinations rather than using pmap inside a loop. Less prone to bugs, generalizes to any number of parameters, and it will let pmap balance the load better.

using IterTools
addprocs(2)

@everywhere begin

function v(x,k)
   return  cos(k * x)
end

function solve_ode(x0, v, k, Dt)
    return v(x0, k) * Dt # replace with actual calculation
end

function run_sim(params)
    Dt, k = params
    return solve_ode(x0, v, k, Dt)
end

end # @everywhere

Dt_vals = logspace(-4,1,11);
k_vals = [1,2,4];
x0 = 0.0;

results = pmap(run_sim, product(Dt_vals, k_vals))

#3

Thanks. I’ll try that. One question: is it clear what order the computed values are returned in the results = pmap line (i.e., are they in the same ordering as the input arguments are stored in product(Dt_vals, k_vals)?


#4

Yes, pmap will return the results in the same order as the inputs.


#5

Ok, Julia does not like this:

nw = 2;
addprocs(nw);

@everywhere using IterTools
@everywhere function Euler(x0, f, Δt, t_final)
    x = deepcopy(x0);
    t = 0.0;
    n = round(Int,t_final/Δt);
    for j = 1:n
        x+= Δt * f(t, x);
    end
    return x
end

@everywhere function f(t, x, k)
    return cos(k * x);
end


Δt_vals = logspace(-2,-1,31);
@everywhere x0 = 1.0;
@everywhere t_final = 1.0;

k_vals = [1,2,4,8,16];

results = pmap((Δt, k)-> Euler(x0, (t,x)->f(t,x,k), Δt, t_final), product(Δt_vals, k_vals) )

and gives the error

ERROR: LoadError: On worker 2:
MethodError: no method matching (::Base.Serializer.__deserialized_types__.##21#23)(::Tuple{Float64,Int64})
Closest candidates are:
  #21(::Any, ::Any) at /Users/gideonsimpson/code/julia_testing/odeparams1.jl:26
#106 at ./distributed/process_messages.jl:268 [inlined]
run_work_thunk at ./distributed/process_messages.jl:56
macro expansion at ./distributed/process_messages.jl:268 [inlined]
#105 at ./event.jl:73
Stacktrace:
 [1] #573 at ./asyncmap.jl:178 [inlined]
 [2] foreach(::Base.##573#575, ::Array{Any,1}) at ./abstractarray.jl:1733
 [3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}, ::Vararg{IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}},N} where N) at ./asyncmap.jl:178
 [4] wrap_n_exec_twice(::Channel{Any}, ::Array{Any,1}, ::Base.Distributed.##204#207{WorkerPool}, ::Function, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}, ::Vararg{IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}},N} where N) at ./asyncmap.jl:154
 [5] #async_usemap#558(::Function, ::Void, ::Function, ::Base.Distributed.##188#190, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}, ::Vararg{IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}},N} where N) at ./asyncmap.jl:103
 [6] (::Base.#kw##async_usemap)(::Array{Any,1}, ::Base.#async_usemap, ::Function, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}, ::Vararg{IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}},N} where N) at ./<missing>:0
 [7] (::Base.#kw##asyncmap)(::Array{Any,1}, ::Base.#asyncmap, ::Function, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}) at ./<missing>:0
 [8] #pmap#203(::Bool, ::Int64, ::Void, ::Array{Any,1}, ::Void, ::Function, ::WorkerPool, ::Function, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}) at ./distributed/pmap.jl:126
 [9] pmap(::WorkerPool, ::Function, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}) at ./distributed/pmap.jl:101
 [10] #pmap#213(::Array{Any,1}, ::Function, ::Function, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}) at ./distributed/pmap.jl:156
 [11] pmap(::Function, ::IterTools.Product{Tuple{Array{Float64,1},Array{Int64,1}}}) at ./distributed/pmap.jl:156
 [12] include_from_node1(::String) at ./loading.jl:576
 [13] include(::String) at ./sysimg.jl:14

#6

This error is because the anonymous function (Δt, k)-> Euler(x0, (t,x)->f(t,x,k), Δt, t_final) takes two arguments, but pmap is passing it a tuple, which it doesn’t know how to unpack. That’s what the run_sim wrapper function is doing in my example above: taking a tuple and passing the individual parameters to the function that does the actual work. You could also define the anonymous function like

tup -> Euler(x0, (t,x)->f(t,x,tup[2]), tup[1], t_final)

#7

Got it, thanks.