Distributed code loading inside modules

I want to move a function that’s defined in a script into my module.

The function is something like

function test(x)
	map([x]) do info
		eval(macroexpand(Distributed, quote @everywhere using Distributions end))

		@everywhere begin
			s = $(info)
			d = Categorical([0.25, 0.25, 0.25, 0.25])
		end

		@everywhere accum(lhs, rhs) = sum(lhs) + sum(rhs)

		m = @distributed accum for b = 1:100
			rand(d, s)
		end
	end
end

and works completely fine when defined in a script / in the REPL.

When I move the function into my module MyModule it does not work any more:

$ julia --project
julia> using MyModule

julia> using Distributed

julia> test(3)
ERROR: UndefVarError: accum not defined
Stacktrace:
 [1] (::RungePolarDPC.var"#124#126")(info::Int64)
   @ RungePolarDPC /data/projects/uni/RungePolarDPC.jl/src/SimulationData.jl:89
 [2] iterate
   @ ./generator.jl:47 [inlined]
 [3] _collect(c::Vector{Int64}, itr::Base.Generator{Vector{Int64}, RungePolarDPC.var"#124#126"}, #unused#::Base.EltypeUnknown, isz::Base.HasShape{1})
   @ Base ./array.jl:691
 [4] collect_similar
   @ ./array.jl:606 [inlined]
 [5] map
   @ ./abstractarray.jl:2294 [inlined]
 [6] test(x::Int64)
   @ RungePolarDPC /data/projects/uni/RungePolarDPC.jl/src/SimulationData.jl:79
 [7] top-level scope
   @ REPL[3]:1

MyModule has using Distributed before defining the function.

What do I need to do, to execute distributed code from a function inside a module?

When I try to define accum in the function scope as well, then the @distributed for seems unable to find s and d defined in the @everywhere block:

function test(x)
	map([x]) do info
		eval(macroexpand(Distributed, quote @everywhere using Distributions end))

		@everywhere begin
			s = $(info)
			d = Categorical([0.25, 0.25, 0.25, 0.25])
		end

		@everywhere accum(lhs, rhs) = sum(lhs) + sum(rhs)
		accum(lhs, rhs) = sum(lhs) + sum(rhs)

		m = @distributed accum for b = 1:100
			rand(d, s)
		end
	end
end
$ julia --project
julia> using MyModule

julia> using Distributed

julia> test(3)
ERROR: TaskFailedException
Stacktrace:
  [1] wait
    @ ./task.jl:317 [inlined]
  [2] fetch
    @ ./task.jl:332 [inlined]
  [3] preduce(reducer::Function, f::Function, R::UnitRange{Int64})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:278
  [4] (::RungePolarDPC.var"#124#126")(info::Int64)
    @ RungePolarDPC /data/projects/uni/RungePolarDPC.jl/src/SimulationData.jl:90
  [5] iterate
    @ ./generator.jl:47 [inlined]
  [6] _collect(c::Vector{Int64}, itr::Base.Generator{Vector{Int64}, RungePolarDPC.var"#124#126"}, #unused#::Base.EltypeUnknown, isz::Base.HasShape{1})
    @ Base ./array.jl:691
  [7] collect_similar
    @ ./array.jl:606 [inlined]
  [8] map
    @ ./abstractarray.jl:2294 [inlined]
  [9] test(x::Int64)
    @ RungePolarDPC /data/projects/uni/RungePolarDPC.jl/src/SimulationData.jl:79
 [10] top-level scope
    @ REPL[3]:1

    nested task error: UndefVarError: d not defined
    Stacktrace:
     [1] macro expansion
       @ /data/projects/uni/RungePolarDPC.jl/src/SimulationData.jl:91 [inlined]
     [2] (::RungePolarDPC.var"#125#128")(reducer::RungePolarDPC.var"#accum#127", R::UnitRange{Int64}, lo::Int64, hi::Int64)
       @ RungePolarDPC /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:291
     [3] #137
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:354 [inlined]
     [4] run_work_thunk(thunk::Distributed.var"#137#138"{RungePolarDPC.var"#125#128", Tuple{RungePolarDPC.var"#accum#127", UnitRange{Int64}, Int64, Int64}, Ba
se.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}}, print_error::Bool)
       @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:63
     [5] #remotecall_fetch#142
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:379 [inlined]
     [6] remotecall_fetch
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:379 [inlined]
     [7] #remotecall_fetch#146
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
     [8] remotecall_fetch
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
     [9] (::Distributed.var"#157#158"{RungePolarDPC.var"#accum#127", RungePolarDPC.var"#125#128", UnitRange{Int64}, Vector{UnitRange{Int64}}, Int64, Int64})()
       @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:274
    Stacktrace:
     [1] #remotecall_fetch#142
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:380 [inlined]
     [2] remotecall_fetch
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:379 [inlined]
     [3] #remotecall_fetch#146
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
     [4] remotecall_fetch
       @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
     [5] (::Distributed.var"#157#158"{RungePolarDPC.var"#accum#127", RungePolarDPC.var"#125#128", UnitRange{Int64}, Vector{UnitRange{Int64}}, Int64, Int64})()
       @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:274

I wouldn’t expect this to be doing what you think it’s doing. @everywhere works at global scope — in the Main module — in all connected processes. So doing things like @everywhere s = $(info) doesn’t really make sense inside a map as it’ll just repeatedly clobber the prior (global) definition. Further, you don’t need to use @everywhere to move info to all the processes; @distributed will automatically grab it and do that for you (if it’s used inside the for loop).

You just want to define your global packages, constants, and functions with @everywhere at top-level.

In other words, you want:

@everywhere using Distributions
@everywhere accum(lhs, rhs) = sum(lhs) + sum(rhs)
function test(x)
	map([x]) do info
		s = info
		d = Categorical([0.25, 0.25, 0.25, 0.25])

		m = @distributed accum for b = 1:100
			rand(d, s)
		end
	end
end
1 Like

Thanks. This does not work anymore, when test() is inside a module, does it?

Wouldn’t the user need to execute the two @everywhere statements before calling test(3)? This in turn would require the user to know, what the exact statements are.

I’ll need to take a second look at the automatically grabbing part. I thought, that I originally implemented it your way, but @distributed failed to grab those variables after I moved everything into a function. But this is a while ago, I’ll need to check again.

Correct — @everywhere just evaluates into the Main module and so if you just wholesale drop the code I wrote into a module that module won’t know about the accum function. Instead, if you’re writing a module that includes @distributed, you’d want to structure your code such that the user just does @everywhere using TheModule (or something similar) to get the code they need on all the processes.

In ad-hoc form:
julia> using Distributed

julia> @everywhere module TheModule
       using Distributed, Distributions
       accum(lhs, rhs) = sum(lhs) + sum(rhs)
       function test(x)
           map([x]) do info
               s = info
               d = Categorical([0.25, 0.25, 0.25, 0.25])

               m = @distributed accum for b = 1:100
                   rand(d, s)
               end
           end
       end
       end

julia> TheModule.test(1)
1-element Vector{Int64}:
 251

The mental model here is (roughly) that Distributed fires up a bunch of Julia sessions in a computer lab and @everywhere runs from keyboard to keyboard typing what you wrote into the REPL. The tricky part is that @distributed will just assume globals have already been defined for you and that locals aren’t — so it sends out all local variables to each process but assumes globals are already there.

2 Likes

I think I understand, thanks. Why does accum need to be at the module scope, then? Shouldn’t @distributed pick it up as well, as it is a local function?

On a slightly different note. Is it possible to encapsulate the effect/scope of @distributed? While prototyping code it sometimes happens to me, that I reference variables, that are not defined any more. But as those variables are still on the workers, nothing fails.

If it’s truly a local function it’s just fine. I think you were just having trouble with all the @everywheres.

julia> module TheModule
       using Distributed
       function test(x)
           map([x]) do info
               s = info
               d = 1:4
               accum(xs, ys) = sum(xs) + sum(ys)
               @distributed accum for b in 1:100
                   rand(d, s)
               end
           end
       end
       end
Main.TheModule

julia> TheModule.test(1)
1-element Vector{Int64}:
 257

This isn’t very sustainable, though, since you’ll probably want to reference some globals or send along types from another module (like Distribution’s Categorical).

The scope of @distributed is exactly the same as the scope of a normal for loop, so modules will indeed give you a “clean” global state but will be tricky in other ways. It’s really no different here than normal for loops on a single node — if you have globals defined it’ll pick them up serially, too.

2 Likes

Thank you for your extensive answer! I’m not sure yet, if I understand why it’s not sustainable.

When MyModule includes e.g. Distribution’s Categorical, shouldn’t @distributed pick up on it?

Using your explanations, I was able to fix my MWE. My actual project still seems to have errors I can’t pin down, though. Due to some reason, a worker can’t find the closure for the distributed for loop:

ERROR: TaskFailedException
    nested task error: On worker 5:
    UndefVarError: #207#216 not defined
    Stacktrace:
      [1] deserialize_datatype
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:1288
      [2] handle_deserialize
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:835
      [3] deserialize
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:782
      [4] handle_deserialize
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:842
      [5] deserialize
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Seria
lization/src/Serialization.jl:782 [inlined]
      [6] deserialize_msg
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distr
ibuted/src/messages.jl:87
      [7] #invokelatest#2
        @ ./essentials.jl:708 [inlined]
      [8] invokelatest
        @ ./essentials.jl:706 [inlined]
      [9] message_handler_loop
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distr
ibuted/src/process_messages.jl:169
     [10] process_tcp_streams
        @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distr
ibuted/src/process_messages.jl:126
     [11] #99
        @ ./task.jl:411
    Stacktrace:
     [1] remotecall_fetch(::Function, ::Distributed.Worker, ::Function,
::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{},
 Tuple{}, NamedTuple{(), Tuple{}}})
       @ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/remotecall.jl:394
     [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Function,
::Vararg{Any, N} where N)
       @ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/remotecall.jl:386
     [3] remotecall_fetch(::Function, ::Int64, ::Function, ::Vararg{Any,
 N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, Nam
edTuple{(), Tuple{}}})
       @ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/remotecall.jl:421
     [4] remotecall_fetch
       @ /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distri
buted/src/remotecall.jl:421 [inlined]
     [5] (::Distributed.var"#157#158"{RungePolarDPC.var"#accum#214"{Mult
ilevelPolarCodeF2Info{ChannelSignaling{Float64, Float64, Float64, Float6
4, Float64}}}, RungePolarDPC.var"#207#216"{Distributions.Normal{Float64}
, Vector{Distributions.Categorical{Float64, Vector{Float64}}}, Distribut
ions.Categorical{Float64, Vector{Float64}}, Vector{NamedTuple{(:u,), Tup
le{Matrix{BoolF}}}}, NamedTuple{(:Ly, :cc, :pacode), Tuple{Vector{SubArr
ay}, Vector{PolarCodeF2}, Vector{NamedTuple}}}, Matrix{BoolF}, Matrix{Bo
olF}, Vector{Float64}, Matrix{Float64}, Matrix{Float64}, Vector{Float64}
, Vector{Float64}, Vector{Float64}, Vector{Int64}, Vector{Int64}, Int64,
 Vector{PolarCodeF2}, MultilevelPolarCodeF2Info{ChannelSignaling{Float64
, Float64, Float64, Float64, Float64}}, Int64, Int64, ChannelSignaling{F
loat64, Float64, Float64, Float64, Float64}}, UnitRange{Int64}, Vector{U
nitRange{Int64}}, Int64, Int64})()
       @ Distributed /build/julia/src/julia-1.6.1/usr/share/julia/stdlib
/v1.6/Distributed/src/macros.jl:274

If I’m not mistaken, var"#accum#235" is the closure for accum, which accesses one variable defined inside the map-do before accum and var"#228#237" is the closure for the @distributed for. I’m not sure, what var"#157#158" is, though. Maybe it’s a closure for the map-do block? I don’t define any variables before map.

I’m not sure how to narrow down this problem. Maybe there’s a problem with one of the captured variables?

Edit 1:
When I execute the function twice, this error still occurs. When I edit the file (I have Revise loaded in the REPL before loading my module) and execute the function after this, the function works as expected every time I call it. After cleaning up the workers and calling the function again, the closure variable is again undefined on the worker. After changing my module and letting Revise do its magic, the function works.

julia> rmprocs(workers())
julia> addprocs(3; exeflags="--project")
julia> @everwhere using MyModule
julia> test(3)
[ERROR: TaskFailedException]
julia> test(4)
[ERROR: Task FailedException]
julia> revise(MyModule) # this is where I change the file
julia> test(3)
[works]
julia> test(2)
[works]
Original code
function get_data(info::MLPDPCMCReliabilitiesInfo)
	dict = produce_or_load(datadir("reliabilities", "monte-carlo"), info) do info

		## Preallocation and parameter extraction

		s = info.s
		Nblocks = info.Nblocks
		Nbatches = info.Nbatches

		sig = get_data(info.sig)

		T = promote_type(eltype(sig.X_sym), eltype(sig.S_sym))
		bits = convert(Int, log2(size(sig.X_sym, 1)))
		lS = size(sig.X_sym, 2)
		lΞ = size(sig.X_sym, 1)

		mlcinfo = MultilevelPolarCodeF2Info(bits, s, sig)
		pcode = PolarCodeF2.(s, Ref([]), Ref([]))

		Nbitchannels = sum(2 .^ s)
		blockmin = 2^(sum(s) - minimum(s)) # = lcm(2^s1, 2^s2)
		blen = blockmin * Nblocks

		Si = Array{Int}(undef, blen)
		Ξ  = Array{Int}(undef, blen)
		S  = similar(sig.S_sym, blen)
		X  = similar(sig.X_sym, blen)
		Z  = Array{T}(undef, blen)
		Ls = Array{Float64}(undef, bits, blen)
		Ly = Array{Float64}(undef, bits, blen)
		Y  = Array{T}(undef, blen)
		Xᴮ = Array{BoolF}(undef, bits, blen)
		Uᴮ = Array{BoolF}(undef, bits, blen)

		padata = preallocate(decode_reliabilities!, Ly, Y, Uᴮ, Xᴮ, LLR, mlcinfo, LLR_SC)
		enc_padata = preallocate.(encode!, Ref(@view Uᴮ[:]), Ref(@view Xᴮ[:]), pcode, MatrixEncoding)

		d_S = Categorical(sig.P_S[:])
		d_Ξ_S = collect(Categorical(sig.P_Ξ_S[:, s]./sum(sig.P_Ξ_S[:,s])) for s in 1:lS)
		d_ZC = MvNormal(2, √(sig.σ²))
		d_ZR = Normal(0, √(sig.σ²))

		accum(lhs, rhs) = map(lhs, rhs) do l, r
			bitchannel_cumulative_mean(l, r, mlcinfo)
		end

		## Entropy Computation

		H_Y, H_S = @distributed accum for b = 1:info.Nbatches
			Si .= rand(d_S, blen)
			S .= sig.S_sym[Si]
			Z .= rand(d_ZR, blen)

			for s in 1:lS
				Ξ[Si .== s] .= rand(d_Ξ_S[s], sum(Si .== s))
			end

			Xᴮ .= convert(Array, VectorOfArray(bitlevels.(sig.b, Ξ)))

			for bit in 1:bits
				foreach_block(blen, 0, pcode[bit]) do cblIdx, iblIdx
					encode!(enc_padata[bit], reshape((@view Uᴮ[bit,iblIdx]), 1, :), (@view Xᴮ[bit,cblIdx]), pcode[bit], MatrixEncoding)
				end
			end

			@. X = sig.X_sym[CartesianIndex(Ξ, Si)]
			@. Y = X + S + Z

			decode_reliabilities!(padata, Ly, Y, Uᴮ, Xᴮ, LLR, mlcinfo, LLR_SC)
			decode_reliabilities!(padata, Ls, Si, Uᴮ, Xᴮ, txLLR, mlcinfo, LLR_SC)

			map((Ly, Ls)) do l
				@. log1pexp(-abs(l))
			end
		end

		H_Y ./= info.Nbatches * log(2)
		H_S ./= info.Nbatches * log(2)

		df = DataFrame()
		for bit in 1:bits
			bdf = DataFrame(BitChannelIdx = 1:length(H_Y[bit]), BitLevel = bit, H_Y = H_Y[bit], H_S = H_S[bit])
			df = vcat(df, bdf)
		end

		@strdict H=df
	end

	dict[1]["H"]
end

Upon further digging, I think this is a problem with Revise. When I call the function without any other modules (like Revise) loaded, everything seems to work as intended.

I do not understand, which part of my code triggers the error, though.