Help with closures and remotecall_fetch


#1

I’m trying to figure out how to use closures with parallel processing. Basically I want to define a closure on each worker which holds a large number of parameters as internal states that are pre-computed on the master node.

In the following simplified example, gen_foo returns a closure and is evalued on all workers.
It appears each worker can run it, if I use @everywhere, but remotecall_fetch gives me an error. BTW: I’m on Version 0.6.0-pre.beta.387

julia> addprocs(2)
2-element Array{Int64,1}:
2
3

julia> @everywhere function gen_foo(parameters)
    n = parameters[2]
    const Θ = parameters[1]
    const a = rand(n,n)
    foo(c) = sum(a[:,c]) - Θ
    return foo::Function
end

julia> parameters = [10,200]
2-element Array{Int64,1}:
10
200

julia> @everywhere foo = gen_foo($parameters)

julia> @everywhere println(foo(10)) # works fine
89.31772292486279
From worker 2:	87.66158790049138

julia> 	From worker 3:	87.61146972350342
julia> remotecall_fetch(foo, 2, 10) # foo not defined on On worker 2
ERROR: On worker 2:
UndefVarError: #foo#9 not defined
deserialize_datatype at ./serialize.jl:968
handle_deserialize at ./serialize.jl:674
deserialize at ./serialize.jl:634
handle_deserialize at ./serialize.jl:681
deserialize_msg at ./distributed/messages.jl:98
message_handler_loop at ./distributed/process_messages.jl:161
process_tcp_streams at ./distributed/process_messages.jl:118
#97 at ./event.jl:73
Stacktrace:
[1] #remotecall_fetch#139(::Array{Any,1}, ::Function, ::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:354
[2] remotecall_fetch(::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:346
[3] #remotecall_fetch#142(::Array{Any,1}, ::Function, ::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367
[4] remotecall_fetch(::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367

julia> remotecall_fetch(foo, 1, 10) # works fine
89.31772292486279

I can somewhat get around this by defining the closure internal states as global variables (see code block below), but then I’m poluting the global namespace and having to change the const variables at later stages (and making it difficult for me to keep track of what variables are used by all the functions)

julia> n = parameters[2]
200

julia> @everywhere const Θ = $(parameters[1])

julia> @everywhere const a = rand($n,$n)

julia> @everywhere foo2(c) = sum(a[:,c]) - Θ

julia> remotecall_fetch(foo2, 2, 10) # works fine
86.65585543066166

julia> remotecall_fetch(foo2, 1, 10) # works fine
86.45878590236921

Any tips how to get the closure case to work?


#2

A workaround is remotecall_fetch(x->eval(Main, :(foo($x))), 2,10)

The issue here is that the foo definition from the the master process is being serialized to the worker and upon execution throws the error you are seeing. In the former case, foo is a global binding and is being serialized. In the second case it a function definition and hence not serialized.

For example, this works as expected

julia> bar = ()->2
(::#24) (generic function with 1 method)

julia> remotecall_fetch(bar, 2)
2

even though bar is not defined on worker 2.

Can you open an issue on github for your use case?


#3

Thanks!

Here is the link to the github issue. Note: I added a somewhat more realistic example with planned ffts as internal states. I think this example demonstrates the usefulness of the closure approach since, presumably, workers on different machines will generate different plans. Therefore it seems serializing from master should be avoided.