Thank you for your extensive answer! I’m not sure yet, if I understand why it’s not sustainable.
When MyModule
includes e.g. Distribution’s Categorical
, shouldn’t @distributed
pick up on it?
Using your explanations, I was able to fix my MWE. My actual project still seems to have errors I can’t pin down, though. Due to some reason, a worker can’t find the closure for the distributed for loop:
ERROR: TaskFailedException
nested task error: On worker 5:
UndefVarError: #207#216 not defined
Stacktrace:
[1] deserialize_datatype
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:1288
[2] handle_deserialize
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:835
[3] deserialize
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:782
[4] handle_deserialize
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:842
[5] deserialize
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:782 [inlined]
[6] deserialize_msg
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distr
ibuted/src/messages.jl:87
[7] #invokelatest#2
@ ./essentials.jl:708 [inlined]
[8] invokelatest
@ ./essentials.jl:706 [inlined]
[9] message_handler_loop
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distr
ibuted/src/process_messages.jl:169
[10] process_tcp_streams
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distr
ibuted/src/process_messages.jl:126
[11] #99
@ ./task.jl:411
Stacktrace:
[1] remotecall_fetch(::Function, ::Distributed.Worker, ::Function,
::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{},
Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/remotecall.jl:394
[2] remotecall_fetch(::Function, ::Distributed.Worker, ::Function,
::Vararg{Any, N} where N)
@ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/remotecall.jl:386
[3] remotecall_fetch(::Function, ::Int64, ::Function, ::Vararg{Any,
N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, Nam
edTuple{(), Tuple{}}})
@ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/remotecall.jl:421
[4] remotecall_fetch
@ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distri
buted/src/remotecall.jl:421 [inlined]
[5] (::Distributed.var"#157#158"{RungePolarDPC.var"#accum#214"{Mult
ilevelPolarCodeF2Info{ChannelSignaling{Float64, Float64, Float64, Float6
4, Float64}}}, RungePolarDPC.var"#207#216"{Distributions.Normal{Float64}
, Vector{Distributions.Categorical{Float64, Vector{Float64}}}, Distribut
ions.Categorical{Float64, Vector{Float64}}, Vector{NamedTuple{(:u,), Tup
le{Matrix{BoolF}}}}, NamedTuple{(:Ly, :cc, :pacode), Tuple{Vector{SubArr
ay}, Vector{PolarCodeF2}, Vector{NamedTuple}}}, Matrix{BoolF}, Matrix{Bo
olF}, Vector{Float64}, Matrix{Float64}, Matrix{Float64}, Vector{Float64}
, Vector{Float64}, Vector{Float64}, Vector{Int64}, Vector{Int64}, Int64,
Vector{PolarCodeF2}, MultilevelPolarCodeF2Info{ChannelSignaling{Float64
, Float64, Float64, Float64, Float64}}, Int64, Int64, ChannelSignaling{F
loat64, Float64, Float64, Float64, Float64}}, UnitRange{Int64}, Vector{U
nitRange{Int64}}, Int64, Int64})()
@ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/macros.jl:274
If I’m not mistaken, var"#accum#235"
is the closure for accum, which accesses one variable defined inside the map-do
before accum and var"#228#237"
is the closure for the @distributed for
. I’m not sure, what var"#157#158"
is, though. Maybe it’s a closure for the map-do
block? I don’t define any variables before map
.
I’m not sure how to narrow down this problem. Maybe there’s a problem with one of the captured variables?
Edit 1:
When I execute the function twice, this error still occurs. When I edit the file (I have Revise loaded in the REPL before loading my module) and execute the function after this, the function works as expected every time I call it. After cleaning up the workers and calling the function again, the closure variable is again undefined on the worker. After changing my module and letting Revise do its magic, the function works.
julia> rmprocs(workers())
julia> addprocs(3; exeflags="--project")
julia> @everwhere using MyModule
julia> test(3)
[ERROR: TaskFailedException]
julia> test(4)
[ERROR: Task FailedException]
julia> revise(MyModule) # this is where I change the file
julia> test(3)
[works]
julia> test(2)
[works]
Original code
function get_data(info::MLPDPCMCReliabilitiesInfo)
dict = produce_or_load(datadir("reliabilities", "monte-carlo"), info) do info
## Preallocation and parameter extraction
s = info.s
Nblocks = info.Nblocks
Nbatches = info.Nbatches
sig = get_data(info.sig)
T = promote_type(eltype(sig.X_sym), eltype(sig.S_sym))
bits = convert(Int, log2(size(sig.X_sym, 1)))
lS = size(sig.X_sym, 2)
lΞ = size(sig.X_sym, 1)
mlcinfo = MultilevelPolarCodeF2Info(bits, s, sig)
pcode = PolarCodeF2.(s, Ref([]), Ref([]))
Nbitchannels = sum(2 .^ s)
blockmin = 2^(sum(s) - minimum(s)) # = lcm(2^s1, 2^s2)
blen = blockmin * Nblocks
Si = Array{Int}(undef, blen)
Ξ = Array{Int}(undef, blen)
S = similar(sig.S_sym, blen)
X = similar(sig.X_sym, blen)
Z = Array{T}(undef, blen)
Ls = Array{Float64}(undef, bits, blen)
Ly = Array{Float64}(undef, bits, blen)
Y = Array{T}(undef, blen)
Xᴮ = Array{BoolF}(undef, bits, blen)
Uᴮ = Array{BoolF}(undef, bits, blen)
padata = preallocate(decode_reliabilities!, Ly, Y, Uᴮ, Xᴮ, LLR, mlcinfo, LLR_SC)
enc_padata = preallocate.(encode!, Ref(@view Uᴮ[:]), Ref(@view Xᴮ[:]), pcode, MatrixEncoding)
d_S = Categorical(sig.P_S[:])
d_Ξ_S = collect(Categorical(sig.P_Ξ_S[:, s]./sum(sig.P_Ξ_S[:,s])) for s in 1:lS)
d_ZC = MvNormal(2, √(sig.σ²))
d_ZR = Normal(0, √(sig.σ²))
accum(lhs, rhs) = map(lhs, rhs) do l, r
bitchannel_cumulative_mean(l, r, mlcinfo)
end
## Entropy Computation
H_Y, H_S = @distributed accum for b = 1:info.Nbatches
Si .= rand(d_S, blen)
S .= sig.S_sym[Si]
Z .= rand(d_ZR, blen)
for s in 1:lS
Ξ[Si .== s] .= rand(d_Ξ_S[s], sum(Si .== s))
end
Xᴮ .= convert(Array, VectorOfArray(bitlevels.(sig.b, Ξ)))
for bit in 1:bits
foreach_block(blen, 0, pcode[bit]) do cblIdx, iblIdx
encode!(enc_padata[bit], reshape((@view Uᴮ[bit,iblIdx]), 1, :), (@view Xᴮ[bit,cblIdx]), pcode[bit], MatrixEncoding)
end
end
@. X = sig.X_sym[CartesianIndex(Ξ, Si)]
@. Y = X + S + Z
decode_reliabilities!(padata, Ly, Y, Uᴮ, Xᴮ, LLR, mlcinfo, LLR_SC)
decode_reliabilities!(padata, Ls, Si, Uᴮ, Xᴮ, txLLR, mlcinfo, LLR_SC)
map((Ly, Ls)) do l
@. log1pexp(-abs(l))
end
end
H_Y ./= info.Nbatches * log(2)
H_S ./= info.Nbatches * log(2)
df = DataFrame()
for bit in 1:bits
bdf = DataFrame(BitChannelIdx = 1:length(H_Y[bit]), BitLevel = bit, H_Y = H_Y[bit], H_S = H_S[bit])
df = vcat(df, bdf)
end
@strdict H=df
end
dict[1]["H"]
end