Parallel Computing For Loop With Dictionaries

You need to use @everywhere for every function definition & module import that you want to be visible on worker processes. You can wrap your code with an @everywhere begin ... end block to avoid annotating every individual definition, or just use @everywhere include("stuff_that_needs_to_be_on_workers.jl").

1 Like

You also should move your addprocs() outside and run it at the start of your script. Again, addprocs is not really going to help you. I think you need to spend some time reading the documentation.

Note that side-effects on dictionaries like solutions[keystr] = complete and speed[keystr] = complete do not work in parallel computation. If you use pmap instead, the modification in the remote will not be reflected back:

julia> let d = Dict()
           pmap(1:2) do i
               d[i] = i
               @show d
           end
           d
       end
      From worker 3:    d = Dict{Any, Any}(2 => 2)
      From worker 2:    d = Dict{Any, Any}(1 => 1)
Dict{Any, Any}()

If you do this with threads, it is a data race and the outcome is undefined.

You need to communicate the result via the returned value.

I started to rework the code to accommodate pmap, as far as I understand it. I’m coming across strange behavior. The problem comes when I try to add to the dictionary. I see the dataframe when I debug at this point.

ERROR: LoadError: AbstractDataFrame is not iterable. Use eachrow(df) to get a row iterator or eachcol(df) to get a column iterator
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:33
[2] iterate(#unused#::DataFrame)
@ DataFrames ~/.julia/packages/DataFrames/zXEKU/src/abstractdataframe/iteration.jl:23
[3] indexed_iterate(I::DataFrame, i::Int64)
@ Base ./tuple.jl:89
[4] macro expansion
@ ~/repos/computing/juliatest_toparallel.jl:146 [inlined]
[5] top-level scope
@ ./timing.jl:210
in expression starting at /home/wesley/repos/computing/juliatest_toparallel.jl:139

@time begin

param = "reaction" # reaction, mesh, diffusion

r_start = 0

r_end = 0.5

r_length = 2

key = 0.10

parameter_range = range(r_start, r_end, length=r_length)

u_steps, speed_steps = process(param, key)

data = reduce(vcat, values(speed_steps))
function calc_speed(df, h, k, M, alpha, D, param, key)
  data = df
  start_time = Int(floor(ncol(data)*0.50))
  end_time = Int(floor(ncol(data)*0.75))
  starting_df = df[:, [start_time]]
  end_df = df[:, [end_time]]

  start = starting_df[starting_df[:, 1] .>= 0.5, :]
  end_ = end_df[end_df[:, 1] .>= 0.5, :]
  start_space = nrow(start)
  end_space = nrow(end_)
  time_interval = (end_time - start_time)*k
  distance_interval = (end_space - start_space)*h
  speed = distance_interval / time_interval
  speed_df = DataFrame("StartPoint" => [start_space], "EndPoint" => [end_space],
  "TimeInterval" => [time_interval], "DistanceInterval" => [distance_interval],
  "Speed" => [speed], param => [key], "M" => [M], "k" => [k], "alpha" => [alpha],
  "D" => [D])
  #speed_df = rename!(speed_df, :param => param)
  return speed_df
end
function process(param, key)
  a = 0
  b = 20
  T = 10
  t_steps = 10^2

  solutions = Dict()
  speed = Dict()

  #solutions = Dict{Float64, DataFrame}()
  #speed = Dict{Float64, DataFrame}()
  key = round(key, digits = 5)
  if param == "reaction"
    M = 512
    D = 1
    alpha = key
  end
  if param == "mesh"
    alpha = -0.5
    D = 1
    M = key
  end
  if param == "diffusion"
    alpha = 0.10
    M = 512
    D = key
  end

  k = T / t_steps
  h = (b - a) / M
  mu = k / h^2
  boundary = "HN"

  x = [h * i for i = 0:M]
  t = [k * i for i = 0:t_steps]

  # initial condition
  u0 = initial_condition(x)

  # crank matrices
  local B_inv, P = crank(M, mu, D, boundary)
  # time step
  local time_steps = calc_new_step(B_inv, P, u0, D, k, t_steps, M, alpha)
  p_title = string("reduced nagumo:", " ", "M = ", M, " ", "alpha = ", alpha, " ",
  "k = ", k, " ", "diff = ", D)
  Plots.contourf(x, t, time_steps, fill=true, c=:vik, title=p_title, xlabel="x", ylabel="t", dpi=300)

  # saves the current plot:
  local output_name = string(p_title, ".png")
  local subfolder = joinpath(pwd(), string(param))
  if ispath(subfolder) == 0
    mkdir(subfolder)
  end
  output_loc = joinpath(subfolder, output_name)
  savefig(output_loc)
  complete = DataFrame(Transpose(time_steps),:auto)
  solutions[key] = complete
  speed_key = calc_speed(complete, h, k, M, alpha, D, param, key)
  speed[key] = speed_key
end