Error using sum in DistributedArrays


#1

Hi,

I am seeking some advice for using the following (piece) of code on a cluster. It is a simulation of a particular type of neural network with N neurons.

When, for the last result of a paper, I do simulation for N=100000 on 40 computers, I got errors mainly like the following. On 10 computers, I don’t get these kind of errors.

Worker 22 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
 [1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
Worker 20 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
 [1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
ERROR: LoadError: ProcessExitedException()
(::DistributedArrays.##110#112{#f,DistributedArrays.DArray{Float64,1,Array{Float64,1}},DistributedArrays.DArray{Float64,1,Array{Float64,1}}})() at ./task.jl:335
Stacktrace:
 [1] sync_end() at ./task.jl:287
 [2] macro expansion at ./task.jl:303 [inlined]
 [3] map!(::#f, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}) at /home/rveltz/.julia/v0.6/DistributedArrays/src/mapreduce.jl:6
 [4] R_mf!(::DistributedArrays.DArray{Float64,1,Array{Float64,1}}, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}, ::Float64, ::DistributedArrays.DArray{networkBaS.BallAndStick,1,Array{networkBaS.BallAndStick,1}}, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:60
 [5] macro expansion at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:136 [inlined]
 [6] macro expansion at ./util.jl:237 [inlined]
 [7] #simule#98(::Float64, ::Function, ::Array{Float64,1}, ::Int64, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:126
 [8] simule(::Array{Float64,1}, ::Int64, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:80
while loading /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl, in expression starting on line 237

Being not very knowledgeable about distributed computation, I am also seeking some advice about my code, basically mistakes, improvements… I can only post part of it though because it is big.

It relies on the structure written in a module in networkBaS.jl

mutable struct BallAndStick
	connections::Vector{Float64}
	stim::Vector{INDS.Solution}
	v::Float64
	index::Int
end

const BaS = BallAndStick

The main loop of the program is the following, which does rejection sampling.

Thank you a lot for your help,

Best regards.

@everywhere include("networkBaS.jl")
@everywhere include("soft.jl")

@everywhere using networkBaS, Distributions, Parameters, ProgressMeter, ElasticArrays
@everywhere importall DistributedArrays, DistributedArrays.SPMD


@everywhere function putV0_d!(neurons,V0)
    V0_l = V0[:L]
    neurons_l = neurons[:L]
	for ii = eachindex(neurons_l)
		neurons_l[ii].v = V0_l[ii]
	end
end

@everywhere function Phi(out,neurons,t,Iext,w)
    # vector field used for the continuous variable
    # le flow est connu entre chaque front somatique
    out_l = out[:L]
    neurons_l = neurons[:L]
    for ii = eachindex(out_l)
        out_l[ii] = flow_neuron(neurons_l[ii],t,Iext,w)
    end
    nothing
end


function R_mf!(rate,Vd,t::Float64,neurons, sum_rate::Bool)
    N = length(Vd)
    bound::Float64 = N * 0.5#1.5 works well [1.48 1.4 1.38] for N=1000
    if t>0.3
        bound = N * 3.5
    end
    if sum_rate == false
        map!(f,rate,Vd)
        return 0., bound
    else
        map!(f,rate,Vd)
        return sum(rate), bound
    end
end

@everywhere function Delta_xc_mf(V_d,neurons,t::Float64,ind_spike::Int64,cleanup_dendrite_time)
    # this function makes the jump of the process
    # we add fronts in each of the dendrites connected to the neuron that just spiked
    # we increased the spike counter for the neuron that just spiked
    V_dl = V_d[:L]
    neurons_l = neurons[:L]
    for ii in eachindex(neurons_l)
        networkBaS.update_dendrite(t,neurons_l[ii],ind_spike,cleanup_dendrite_time)
		V_dl[ii] = neurons_l[ii].v
    end
    return true
end

function simule(V0::Array{Float64},n_max::Int64,verbose = false;Iext=1.)
    # simule n spikes pour le reseau de N BaS
    N = length(V0)
    write(STDOUT,"**************\n--> Debut de simulation, N = $N\n");flush(STDOUT)

    # distribute V0 to apply it to each neuron
    V_d  = distribute(V0)
    Xd_d = distribute(zeros(Int64,N))

    # here we declare the network of neurons
    # rnd_connections = Distributions.Truncated(Distributions.Cauchy(1.,1.),0.,1.)# Uniform()
    rnd_connections = Uniform()
    write(STDOUT,"--> network initialisation...");flush(STDOUT)
    @sync neurons = @DArray [networkBaS.BaS(rnd_connections,N,i) for i=1:N];
    write(STDOUT,"done!\n");flush(STDOUT)
    # @sync for p=procs(neurons) @spawnat p println(size(localpart(neurons))," - ",localindexes(neurons)) end;flush(STDOUT)

    # put initial condition
    write(STDOUT,"--> V0 initialisation...");flush(STDOUT)
    @sync spmd(putV0_d!,neurons,V_d)
    write(STDOUT,"done!\n");flush(STDOUT)

    write(STDOUT,"--> rejection starting!\n");flush(STDOUT)

    # intialisation of rejection method
    t        = 0. #temps courant
    tf       = 100.
    njumps   = 1
    nsteps   = 0
    nb_rejet = 0
    w        = 1.0/N 
    
    rate_vector     = copy(V_d)
    _,lambda_star = R_mf!(rate_vector,V_d,t,neurons,false)
    # this time is kept in order to save memory
    const cleanup_dendrite_time = [0.]
    const ev = 1

	# save data
	jump_time = [0.]
	indices   = [0]
    emp_rate  = [0.]
    xc = ElasticArray{Float64}(N,0)
    append!(xc,V_d)

    println("----> for loop starting")
	p = Progress(n_max)
    @time for njumps = 2:n_max
        verbose && (write(STDOUT,"\n--> step : $njumps, / $n_max, #reject = $nsteps" , ", lambda_star = $lambda_star, t=$(round(t,5)), N = $N\n");flush(STDOUT))
        reject = true
        nsteps = 1
        while (reject) && (nsteps < 10^6) && (t < tf)
			tp = (t, t - log(rand())/lambda_star )		# mettre un lambda_star?
            # we evolve the flow inplace
            # ca met a jour les potentiels de membrane neurons[i].v
            spmd(Phi,V_d, neurons, tp, Iext, w)
			t = tp[2]
			ppf = R_mf!(rate_vector, V_d, t, neurons, true) 	# we don't want the full rate vector, just the sum of rates
			@assert ppf[1] <= ppf[2] "(Rejection algorithm) Your bound on the total rate is wrong $ppf, N=$N, t=$t"
			reject = rand() <  (1. - ppf[1] / lambda_star)
			nsteps += 1
		end
        # keep track of nb of rejections
		nb_rejet += nsteps-1

		@assert(nsteps <= 10^6,"Error, too many rejections!!")

        _ , lambda_star = R_mf!(rate_vector,V_d,t,neurons,false)
        # verbose && println("----> rate = $rate_vector,Xc = $V_d" )
		pf = StatsBase.Weights(convert(Array{Float64,1},rate_vector)) #this is to ease sampling

        @assert(pf.sum>0,"Error, rate vector is null for some reason")
        if (t < tf)
			# make a jump
			ev = Distributions.sample(pf)
            # verbose && @show pf,ev
			verbose && write(STDOUT,"----> reaction = $ev" )
            # on effectue les sauts. Ca veut dire, on met a jour les dendrites et on met
            # le potentiel de membrane du neurone qui vient de sauter a zero
			spmd(Delta_xc_mf,V_d,neurons,t,ev,cleanup_dendrite_time)
			push!(jump_time,t)
			push!(indices,ev)
            # save data
            push!(emp_rate,mean(rate_vector))
            append!(xc,convert(Vector{Float64},V_d))
		end

        if t> cleanup_dendrite_time[1] + 0.2
            cleanup_dendrite_time[1]  += 0.2 #saving floor(t) for update purposes
        end
		# ProgressMeter.next!(p; showvalues = [(:njumps,njumps), (:t,round(t,4))])


    end
    write(STDOUT,"njumps = ",njumps," / rejections = ", nb_rejet, ", lambda_star = ",lambda_star, ", t=$(round(t,5)), N = $N\n\n");flush(STDOUT);
	close(neurons) #call gc for the distributed array
    return convert(Array{networkBaS.BallAndStick},neurons[1:100]), jump_time, indices, emp_rate,xc
end

neurons,time,jmp,rate,xc = @time simule(rand(100),10,true);
write(STDOUT,"--> saving data...\n");flush(STDOUT)
@save "parallel-$(ENV["OAR_JOB_ID"]).jld"
# @assert 1==0
#

N = 100_000
    neurons,time,jmp,rate,xc = @time simule(rand(N),25_000,true);


#2

I get another issue stemming from the call spmd(Delta_xc_mf,V_d,neurons,t,ev,cleanup_dendrite_time).

Is my logic for using spmd flawed?

Worker 55 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
 [1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
Worker 56 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
 [1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
Worker 53 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
 [1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
Worker 46 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
 [1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
Worker 49 terminated.
ERROR (unhandled task failure): EOFError: read end of file
Stacktrace:
 [1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
ERROR: LoadError: ProcessExitedException()
(::DistributedArrays.SPMD.##26#28{DistributedArrays.SPMD.##25#27{#Delta_xc_mf,Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},DistributedArrays.DArray{networkBaS.BallAndStick,1,Array{networkBaS.BallAndStick,1}},Float64,Int64,Array{Float64,1}}}})() at ./task.jl:335

...and 2 more exception(s).

Stacktrace:
 [1] sync_end() at ./task.jl:287
 [2] macro expansion at ./task.jl:303 [inlined]
 [3] #spmd#24(::Array{Int64,1}, ::Void, ::Function, ::Function, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}, ::Vararg{Any,N} where N) at /home/rveltz/.julia/v0.6/DistributedArrays/src/spmd.jl:243
 [4] macro expansion at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:157 [inlined]
 [5] macro expansion at ./util.jl:237 [inlined]
 [6] #simule#94(::Float64, ::Function, ::Array{Float64,1}, ::Int64, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:125
 [7] simule(::Array{Float64,1}, ::Int64, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:80
while loading /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl, in expression starting on line 237
WARNING: Forcibly interrupting busy workers
WARNING: rmprocs: process 1 not removed
[rveltz@nef-frontal MF-dendrite]$

#3

Hi,

I narrowed down the error, before, it was because there weren’t enough memory. Now the reported error is always the same: it always point to the use of sum in my code above, sum is applied to a distributed array. This usually occurs after some time though.

However, I don’t understand if EPERM causes the sum mistake or the other way around. Distributed debugging is hard :frowning: .

I would appreciate a hint please. So far, I am amazed by this code running on 250 processors thanks to DistributedArrays.

Best regards,

WARNING: Error trying to reuse client port number, falling back to plain socket : cannot obtain socket name: operation not permitted (EPERM)
Worker 30 terminated.
Worker 31 terminated.
...
[1] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
ERROR: LoadError: LoadError: ProcessExitedException()
(::DistributedArrays.##120#122{Base.#+,DistributedArrays.DArray{Float64,1,Array{Float64,1}},Array{Any,1}})() at ./task.jl:335

...and 31 more exception(s).

Stacktrace:
 [1] sync_end() at ./task.jl:287
 [2] macro expansion at ./task.jl:303 [inlined]
 [3] reduce(::Function, ::DistributedArrays.DArray{Float64,1,Array{Float64,1}}) at /home/rveltz/.julia/v0.6/DistributedArrays/src/mapreduce.jl:40
 [4] sum(::DistributedArrays.DArray{Float64,1,Array{Float64,1}}) at /home/rveltz/.julia/v0.6/DistributedArrays/src/mapreduce.jl:150
 [5] macro expansion at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:189 [inlined]
 [6] macro expansion at ./util.jl:237 [inlined]
 [7] #simule#92(::Float64, ::Bool, ::UnitRange{Int64}, ::Function, ::Array{Float64,1}, ::Int64, ::Bool) at /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl:146
 [8] (::#kw##simule)(::Array{Any,1}, ::#simule, ::Array{Float64,1}, ::Int64, ::Bool) at ./<missing>:0
while loading /home/rveltz/prog/MF-dendrite/network-nds-parallel.jl, in expression starting on line 237
while loading /home/rveltz/prog/MF-dendrite/avoid-bug.jl, in expression starting on line 3