Hi,
I am seeking some advice for using the following (piece) of code on a cluster. It is a simulation of a particular type of neural network with N neurons.
When, for the last result of a paper, I do simulation for N=100000 on 40 computers, I got errors mainly like the following. On 10 computers, I don’t get these kind of errors.
Worker 22 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
[1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
Worker 20 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
[1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
ERROR: LoadError: ProcessExitedException()
(::DistributedArrays.##110#112{#f,DistributedArrays.DArray{Float64,1,Array{Float64,1}},DistributedArrays.DArray{Float64,1,Array{Float64,1}}})() at ./task.jl:335
Stacktrace:
[1] sync_end() at ./task.jl:287
[2] macro expansion at ./task.jl:303 [inlined]
[3] map!(::#f, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}) at /home/rveltz/.julia/v0.6/DistributedArrays/src/mapreduce.jl:6
[4] R_mf!(::DistributedArrays.DArray{Float64,1,Array{Float64,1}}, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}, ::Float64, ::DistributedArrays.DArray{networkBaS.BallAndStick,1,Array{networkBaS.BallAndStick,1}}, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:60
[5] macro expansion at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:136 [inlined]
[6] macro expansion at ./util.jl:237 [inlined]
[7] #simule#98(::Float64, ::Function, ::Array{Float64,1}, ::Int64, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:126
[8] simule(::Array{Float64,1}, ::Int64, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:80
while loading /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl, in expression starting on line 237
Being not very knowledgeable about distributed computation, I am also seeking some advice about my code, basically mistakes, improvements… I can only post part of it though because it is big.
It relies on the structure written in a module in networkBaS.jl
mutable struct BallAndStick
connections::Vector{Float64}
stim::Vector{INDS.Solution}
v::Float64
index::Int
end
const BaS = BallAndStick
The main loop of the program is the following, which does rejection sampling.
Thank you a lot for your help,
Best regards.
@everywhere include("networkBaS.jl")
@everywhere include("soft.jl")
@everywhere using networkBaS, Distributions, Parameters, ProgressMeter, ElasticArrays
@everywhere importall DistributedArrays, DistributedArrays.SPMD
@everywhere function putV0_d!(neurons,V0)
V0_l = V0[:L]
neurons_l = neurons[:L]
for ii = eachindex(neurons_l)
neurons_l[ii].v = V0_l[ii]
end
end
@everywhere function Phi(out,neurons,t,Iext,w)
# vector field used for the continuous variable
# le flow est connu entre chaque front somatique
out_l = out[:L]
neurons_l = neurons[:L]
for ii = eachindex(out_l)
out_l[ii] = flow_neuron(neurons_l[ii],t,Iext,w)
end
nothing
end
function R_mf!(rate,Vd,t::Float64,neurons, sum_rate::Bool)
N = length(Vd)
bound::Float64 = N * 0.5#1.5 works well [1.48 1.4 1.38] for N=1000
if t>0.3
bound = N * 3.5
end
if sum_rate == false
map!(f,rate,Vd)
return 0., bound
else
map!(f,rate,Vd)
return sum(rate), bound
end
end
@everywhere function Delta_xc_mf(V_d,neurons,t::Float64,ind_spike::Int64,cleanup_dendrite_time)
# this function makes the jump of the process
# we add fronts in each of the dendrites connected to the neuron that just spiked
# we increased the spike counter for the neuron that just spiked
V_dl = V_d[:L]
neurons_l = neurons[:L]
for ii in eachindex(neurons_l)
networkBaS.update_dendrite(t,neurons_l[ii],ind_spike,cleanup_dendrite_time)
V_dl[ii] = neurons_l[ii].v
end
return true
end
function simule(V0::Array{Float64},n_max::Int64,verbose = false;Iext=1.)
# simule n spikes pour le reseau de N BaS
N = length(V0)
write(STDOUT,"**************\n--> Debut de simulation, N = $N\n");flush(STDOUT)
# distribute V0 to apply it to each neuron
V_d = distribute(V0)
Xd_d = distribute(zeros(Int64,N))
# here we declare the network of neurons
# rnd_connections = Distributions.Truncated(Distributions.Cauchy(1.,1.),0.,1.)# Uniform()
rnd_connections = Uniform()
write(STDOUT,"--> network initialisation...");flush(STDOUT)
@sync neurons = @DArray [networkBaS.BaS(rnd_connections,N,i) for i=1:N];
write(STDOUT,"done!\n");flush(STDOUT)
# @sync for p=procs(neurons) @spawnat p println(size(localpart(neurons))," - ",localindexes(neurons)) end;flush(STDOUT)
# put initial condition
write(STDOUT,"--> V0 initialisation...");flush(STDOUT)
@sync spmd(putV0_d!,neurons,V_d)
write(STDOUT,"done!\n");flush(STDOUT)
write(STDOUT,"--> rejection starting!\n");flush(STDOUT)
# intialisation of rejection method
t = 0. #temps courant
tf = 100.
njumps = 1
nsteps = 0
nb_rejet = 0
w = 1.0/N
rate_vector = copy(V_d)
_,lambda_star = R_mf!(rate_vector,V_d,t,neurons,false)
# this time is kept in order to save memory
const cleanup_dendrite_time = [0.]
const ev = 1
# save data
jump_time = [0.]
indices = [0]
emp_rate = [0.]
xc = ElasticArray{Float64}(N,0)
append!(xc,V_d)
println("----> for loop starting")
p = Progress(n_max)
@time for njumps = 2:n_max
verbose && (write(STDOUT,"\n--> step : $njumps, / $n_max, #reject = $nsteps" , ", lambda_star = $lambda_star, t=$(round(t,5)), N = $N\n");flush(STDOUT))
reject = true
nsteps = 1
while (reject) && (nsteps < 10^6) && (t < tf)
tp = (t, t - log(rand())/lambda_star ) # mettre un lambda_star?
# we evolve the flow inplace
# ca met a jour les potentiels de membrane neurons[i].v
spmd(Phi,V_d, neurons, tp, Iext, w)
t = tp[2]
ppf = R_mf!(rate_vector, V_d, t, neurons, true) # we don't want the full rate vector, just the sum of rates
@assert ppf[1] <= ppf[2] "(Rejection algorithm) Your bound on the total rate is wrong $ppf, N=$N, t=$t"
reject = rand() < (1. - ppf[1] / lambda_star)
nsteps += 1
end
# keep track of nb of rejections
nb_rejet += nsteps-1
@assert(nsteps <= 10^6,"Error, too many rejections!!")
_ , lambda_star = R_mf!(rate_vector,V_d,t,neurons,false)
# verbose && println("----> rate = $rate_vector,Xc = $V_d" )
pf = StatsBase.Weights(convert(Array{Float64,1},rate_vector)) #this is to ease sampling
@assert(pf.sum>0,"Error, rate vector is null for some reason")
if (t < tf)
# make a jump
ev = Distributions.sample(pf)
# verbose && @show pf,ev
verbose && write(STDOUT,"----> reaction = $ev" )
# on effectue les sauts. Ca veut dire, on met a jour les dendrites et on met
# le potentiel de membrane du neurone qui vient de sauter a zero
spmd(Delta_xc_mf,V_d,neurons,t,ev,cleanup_dendrite_time)
push!(jump_time,t)
push!(indices,ev)
# save data
push!(emp_rate,mean(rate_vector))
append!(xc,convert(Vector{Float64},V_d))
end
if t> cleanup_dendrite_time[1] + 0.2
cleanup_dendrite_time[1] += 0.2 #saving floor(t) for update purposes
end
# ProgressMeter.next!(p; showvalues = [(:njumps,njumps), (:t,round(t,4))])
end
write(STDOUT,"njumps = ",njumps," / rejections = ", nb_rejet, ", lambda_star = ",lambda_star, ", t=$(round(t,5)), N = $N\n\n");flush(STDOUT);
close(neurons) #call gc for the distributed array
return convert(Array{networkBaS.BallAndStick},neurons[1:100]), jump_time, indices, emp_rate,xc
end
neurons,time,jmp,rate,xc = @time simule(rand(100),10,true);
write(STDOUT,"--> saving data...\n");flush(STDOUT)
@save "parallel-$(ENV["OAR_JOB_ID"]).jld"
# @assert 1==0
#
N = 100_000
neurons,time,jmp,rate,xc = @time simule(rand(N),25_000,true);