Hi all,
I want to write some parallel library code that works on generic types; for this purpose I want to create RemoteChannel
s between processes with a particular type for passing data back and forth. They need to be persistent, so using @generated
is out since there’s no guarantee that the function body is only evaluated once. I have the following code in DistTest.jl
that simulates something like I’d like to do:
module DistTest
export queue, finalize, initialize
using Distributed
module WorkerModule
function do_work()
while true
item = receiver |> take!
item === nothing && break
println(item)
end
end
function initialize(T::DataType)
eval(
quote
const receiver = Channel{Union{$T, Nothing}}(8)
end
)
T
end
end
function initialize(T::DataType)
@sync for worker in workers()
@spawnat worker DistTest.WorkerModule.initialize(T)
end
eval(:(
const communicators = Dict(
worker => RemoteChannel(() -> DistTest.WorkerModule.receiver, worker)
for worker in workers())
)
)
eval(:(
queue(x::$T, worker) = put!(communicators[worker], x)
))
eval(quote
let futs = map(worker -> remotecall(DistTest.WorkerModule.do_work, worker), workers())
global finalize
function finalize()
map(worker -> put!(communicators[worker], nothing), workers())
map(fetch, futs)
end
end
end)
T
end
end
Then the end user just calls DistTest.initialize(T)
once so that the appropriately typed channels are created. However, the following errors (after including DistTest.jl):
@everywhere using Main.DistTest
julia> DistTest.initialize(String)
ERROR: On worker 2:
UndefVarError: ##4#6 not defined
deserialize_datatype at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:1051
handle_deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:743
deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:703
handle_deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:750
deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:703
#1 at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:810
ntuple at .\tuple.jl:133
deserialize_tuple at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:810
handle_deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:733
deserialize_msg at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:703
#invokelatest#1 at .\essentials.jl:686 [inlined]
invokelatest at .\essentials.jl:685 [inlined]
message_handler_loop at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\process_messages.jl:160
process_tcp_streams at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\process_messages.jl:117
#105 at .\task.jl:259
Stacktrace:
[1] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Function, ::Vararg{Any,N} where N) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:379
[2] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any,N} where N) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:371
[3] #remotecall_fetch#152 at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:392 [inlined]
[4] remotecall_fetch at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:392 [inlined]
[5] Type at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:108 [inlined]
[6] (::getfield(Main.DistTest, Symbol("##3#5")))(::Int64) at .\none:0
[7] iterate at .\generator.jl:47 [inlined]
[8] _all(::getfield(Base, Symbol("##220#222")), ::Base.Generator{Array{Int64,1},getfield(Main.DistTest, Symbol("##3#5"))}, ::Colon) at .\reduce.jl:660
[9] all at .\reduce.jl:656 [inlined]
[10] Dict(::Base.Generator{Array{Int64,1},getfield(Main.DistTest, Symbol("##3#5"))}) at .\dict.jl:131
[11] top-level scope at none:0
[12] eval at .\boot.jl:319 [inlined]
[13] eval at C:\Users\seanm\Desktop\DistTest.jl:1 [inlined]
[14] macro expansion at .\task.jl:243 [inlined]
[15] initialize(::DataType) at C:\Users\seanm\Desktop\DistTest.jl:30
[16] top-level scope at none:0
and I can’t parse what’s going on. It’s erroring at the line creating a Dict
of worker => RemoteChannel
for the master process to communicate.
The following, however, does work:
julia> @everywhere using Main.DistTest
julia> @sync for worker in workers()
@spawnat worker DistTest.WorkerModule.initialize(String)
end
julia> eval(:(
const communicators = Dict(
worker => RemoteChannel(() -> DistTest.WorkerModule.receiver, worker)
for worker in workers())
)
)
Dict{Int64,RemoteChannel{Channel{Union{Nothing, String}}}} with 2 entries:
2 => RemoteChannel{Channel{Union{Nothing, String}}}(2, 1, 16)
3 => RemoteChannel{Channel{Union{Nothing, String}}}(3, 1, 18)
so it seems to me like there’s not anything fundamentally unsound about my approach, since it works in Main
. How should I be approaching this problem, and is this a bug?