Type-dependent generation of Channels on workers

Hi all,

I want to write some parallel library code that works on generic types; for this purpose I want to create RemoteChannels between processes with a particular type for passing data back and forth. They need to be persistent, so using @generated is out since there’s no guarantee that the function body is only evaluated once. I have the following code in DistTest.jl that simulates something like I’d like to do:

module DistTest

export queue, finalize, initialize

using Distributed

module WorkerModule

function do_work()
    while true
        item = receiver |> take!
        
        item === nothing && break
        println(item)
    end
end

function initialize(T::DataType)
    eval(
    quote
        const receiver = Channel{Union{$T, Nothing}}(8)
    end
    )
    T
end

end

function initialize(T::DataType)
    @sync for worker in workers()
        @spawnat worker DistTest.WorkerModule.initialize(T)
    end
    
    eval(:(
        const communicators = Dict(
            worker => RemoteChannel(() -> DistTest.WorkerModule.receiver, worker) 
            for worker in workers())
        )
    )
    eval(:(
        queue(x::$T, worker) = put!(communicators[worker], x)
    ))
        
    eval(quote
        let futs = map(worker -> remotecall(DistTest.WorkerModule.do_work, worker), workers())
            global finalize
            function finalize()
                map(worker -> put!(communicators[worker], nothing), workers())
                map(fetch, futs)
            end
        end
    end)
    T
end

end

Then the end user just calls DistTest.initialize(T) once so that the appropriately typed channels are created. However, the following errors (after including DistTest.jl):

@everywhere using Main.DistTest

julia> DistTest.initialize(String)
ERROR: On worker 2:
UndefVarError: ##4#6 not defined
deserialize_datatype at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:1051
handle_deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:743
deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:703
handle_deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:750
deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:703
#1 at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:810
ntuple at .\tuple.jl:133
deserialize_tuple at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:810
handle_deserialize at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:733
deserialize_msg at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Serialization\src\Serialization.jl:703
#invokelatest#1 at .\essentials.jl:686 [inlined]
invokelatest at .\essentials.jl:685 [inlined]
message_handler_loop at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\process_messages.jl:160
process_tcp_streams at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\process_messages.jl:117
#105 at .\task.jl:259
Stacktrace:
 [1] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Function, ::Vararg{Any,N} where N) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:379
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any,N} where N) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:371
 [3] #remotecall_fetch#152 at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:392 [inlined]
 [4] remotecall_fetch at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:392 [inlined]
 [5] Type at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.0\Distributed\src\remotecall.jl:108 [inlined]
 [6] (::getfield(Main.DistTest, Symbol("##3#5")))(::Int64) at .\none:0
 [7] iterate at .\generator.jl:47 [inlined]
 [8] _all(::getfield(Base, Symbol("##220#222")), ::Base.Generator{Array{Int64,1},getfield(Main.DistTest, Symbol("##3#5"))}, ::Colon) at .\reduce.jl:660
 [9] all at .\reduce.jl:656 [inlined]
 [10] Dict(::Base.Generator{Array{Int64,1},getfield(Main.DistTest, Symbol("##3#5"))}) at .\dict.jl:131
 [11] top-level scope at none:0
 [12] eval at .\boot.jl:319 [inlined]
 [13] eval at C:\Users\seanm\Desktop\DistTest.jl:1 [inlined]
 [14] macro expansion at .\task.jl:243 [inlined]
 [15] initialize(::DataType) at C:\Users\seanm\Desktop\DistTest.jl:30
 [16] top-level scope at none:0

and I can’t parse what’s going on. It’s erroring at the line creating a Dict of worker => RemoteChannel for the master process to communicate.

The following, however, does work:

julia> @everywhere using Main.DistTest

julia> @sync for worker in workers()
       @spawnat worker DistTest.WorkerModule.initialize(String)
       end

julia> eval(:(
               const communicators = Dict(
                       worker => RemoteChannel(() -> DistTest.WorkerModule.receiver, worker)
                       for worker in workers())
             )
       )
Dict{Int64,RemoteChannel{Channel{Union{Nothing, String}}}} with 2 entries:
  2 => RemoteChannel{Channel{Union{Nothing, String}}}(2, 1, 16)
  3 => RemoteChannel{Channel{Union{Nothing, String}}}(3, 1, 18)

so it seems to me like there’s not anything fundamentally unsound about my approach, since it works in Main. How should I be approaching this problem, and is this a bug?