You need to use @everywhere
for every function definition & module import that you want to be visible on worker processes. You can wrap your code with an @everywhere begin ... end
block to avoid annotating every individual definition, or just use @everywhere include("stuff_that_needs_to_be_on_workers.jl")
.
You also should move your addprocs()
outside and run it at the start of your script. Again, addprocs
is not really going to help you. I think you need to spend some time reading the documentation.
Note that side-effects on dictionaries like solutions[keystr] = complete
and speed[keystr] = complete
do not work in parallel computation. If you use pmap
instead, the modification in the remote will not be reflected back:
julia> let d = Dict()
pmap(1:2) do i
d[i] = i
@show d
end
d
end
From worker 3: d = Dict{Any, Any}(2 => 2)
From worker 2: d = Dict{Any, Any}(1 => 1)
Dict{Any, Any}()
If you do this with threads, it is a data race and the outcome is undefined.
You need to communicate the result via the returned value.
I started to rework the code to accommodate pmap, as far as I understand it. I’m coming across strange behavior. The problem comes when I try to add to the dictionary. I see the dataframe when I debug at this point.
ERROR: LoadError: AbstractDataFrame is not iterable. Use eachrow(df) to get a row iterator or eachcol(df) to get a column iterator
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:33
[2] iterate(#unused#::DataFrame)
@ DataFrames ~/.julia/packages/DataFrames/zXEKU/src/abstractdataframe/iteration.jl:23
[3] indexed_iterate(I::DataFrame, i::Int64)
@ Base ./tuple.jl:89
[4] macro expansion
@ ~/repos/computing/juliatest_toparallel.jl:146 [inlined]
[5] top-level scope
@ ./timing.jl:210
in expression starting at /home/wesley/repos/computing/juliatest_toparallel.jl:139
@time begin
param = "reaction" # reaction, mesh, diffusion
r_start = 0
r_end = 0.5
r_length = 2
key = 0.10
parameter_range = range(r_start, r_end, length=r_length)
u_steps, speed_steps = process(param, key)
data = reduce(vcat, values(speed_steps))
function calc_speed(df, h, k, M, alpha, D, param, key)
data = df
start_time = Int(floor(ncol(data)*0.50))
end_time = Int(floor(ncol(data)*0.75))
starting_df = df[:, [start_time]]
end_df = df[:, [end_time]]
start = starting_df[starting_df[:, 1] .>= 0.5, :]
end_ = end_df[end_df[:, 1] .>= 0.5, :]
start_space = nrow(start)
end_space = nrow(end_)
time_interval = (end_time - start_time)*k
distance_interval = (end_space - start_space)*h
speed = distance_interval / time_interval
speed_df = DataFrame("StartPoint" => [start_space], "EndPoint" => [end_space],
"TimeInterval" => [time_interval], "DistanceInterval" => [distance_interval],
"Speed" => [speed], param => [key], "M" => [M], "k" => [k], "alpha" => [alpha],
"D" => [D])
#speed_df = rename!(speed_df, :param => param)
return speed_df
end
function process(param, key)
a = 0
b = 20
T = 10
t_steps = 10^2
solutions = Dict()
speed = Dict()
#solutions = Dict{Float64, DataFrame}()
#speed = Dict{Float64, DataFrame}()
key = round(key, digits = 5)
if param == "reaction"
M = 512
D = 1
alpha = key
end
if param == "mesh"
alpha = -0.5
D = 1
M = key
end
if param == "diffusion"
alpha = 0.10
M = 512
D = key
end
k = T / t_steps
h = (b - a) / M
mu = k / h^2
boundary = "HN"
x = [h * i for i = 0:M]
t = [k * i for i = 0:t_steps]
# initial condition
u0 = initial_condition(x)
# crank matrices
local B_inv, P = crank(M, mu, D, boundary)
# time step
local time_steps = calc_new_step(B_inv, P, u0, D, k, t_steps, M, alpha)
p_title = string("reduced nagumo:", " ", "M = ", M, " ", "alpha = ", alpha, " ",
"k = ", k, " ", "diff = ", D)
Plots.contourf(x, t, time_steps, fill=true, c=:vik, title=p_title, xlabel="x", ylabel="t", dpi=300)
# saves the current plot:
local output_name = string(p_title, ".png")
local subfolder = joinpath(pwd(), string(param))
if ispath(subfolder) == 0
mkdir(subfolder)
end
output_loc = joinpath(subfolder, output_name)
savefig(output_loc)
complete = DataFrame(Transpose(time_steps),:auto)
solutions[key] = complete
speed_key = calc_speed(complete, h, k, M, alpha, D, param, key)
speed[key] = speed_key
end