Understanding data movement with Distributed.jl

As outlined in the manual, Multi-processing and Distributed Computing · The Julia Language, Distributed.jl performs “implicit” data movement operations between the different processes. Unfortunately, that page doesn’t really make it clear under what conditions these occur and how to have some control over the communication between processes.

Generally, I think three different types of variables are relevant here: global variables, arguments passed to remotely-called functions, and closure variables.

  • Global variables: This seems like the most difficult one to follow (it doesn’t help that, for instance, the manual incorrectly refers to a function referring to a global variable as a “closure”). This seems like the most pertinent part:

    Remote calls with embedded global references (under Main module only) manage globals as follows:

    • New global bindings are created on destination workers if they are referenced as part of a remote call.
    • Global constants are declared as constants on remote nodes too.
    • Globals are re-sent to a destination worker only in the context of a remote call, and then only if its value has changed. Also, the cluster does not synchronize global bindings across nodes.

    So, if I understand correctly, if I do something like

    julia> x = 777
    777
    
    julia> remotecall_fetch(varinfo, 2)
      name             size summary
      ––––––––––– ––––––––– –––––––
      Base                  Module 
      Core                  Module 
      Distributed 1.091 MiB Module 
      Main                  Module 
    
    julia> remotecall_fetch(() -> x, 2)
    777
    
    julia> remotecall_fetch(varinfo, 2)
      name             size summary
      ––––––––––– ––––––––– –––––––
      Base                  Module 
      Core                  Module 
      Distributed 1.098 MiB Module 
      Main                  Module 
      x             8 bytes Int64  
    
    

    remotecall will somehow automatically go through the passed function, figure out if it references any global variables, and send those to the remote processor and define them there as well before running the function there? Except that, somehow, this doesn’t seem to work with functions or types, which can’t just be serialized and sent over? But then why does it work with anonymous functions?

    julia> f() = 123
    f (generic function with 1 method)
    
    julia> g = () -> 123
    #7 (generic function with 1 method)
    
    julia> remotecall_fetch(() -> f(), 2)
    ERROR: On worker 2:
    UndefVarError: `#f` not defined
    Stacktrace:
      [1] deserialize_datatype
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:1385
      [2] handle_deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:869
      [3] deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:816
      [4] handle_deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:876
      [5] deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:816 [inlined]
      [6] deserialize_global_from_main
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/clusterserialize.jl:160
      [7] #5
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/clusterserialize.jl:72 [inlined]
      [8] foreach
        @ ./abstractarray.jl:3073
      [9] deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/clusterserialize.jl:72
     [10] handle_deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:962
     [11] deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:816
     [12] handle_deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:873
     [13] deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:816
     [14] handle_deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:876
     [15] deserialize
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:816 [inlined]
     [16] deserialize_msg
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/messages.jl:87
     [17] #invokelatest#2
        @ ./essentials.jl:816 [inlined]
     [18] invokelatest
        @ ./essentials.jl:813 [inlined]
     [19] message_handler_loop
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:176
     [20] process_tcp_streams
        @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:133
     [21] #103
        @ ./task.jl:514
    Stacktrace:
     [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
       @ Distributed /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
     [2] remotecall_fetch(::Function, ::Distributed.Worker)
       @ Distributed /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
     [3] #remotecall_fetch#162
       @ /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
     [4] remotecall_fetch(::Function, ::Int64)
       @ Distributed /opt/julia-1.9.1/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
     [5] top-level scope
       @ REPL[6]:1
    
    julia> remotecall_fetch(() -> g(), 2)
    123
    
    julia> remotecall_fetch(varinfo, 2)
      name             size summary                            
      ––––––––––– ––––––––– –––––––––––––––––––––––––––––––––––
      Base                  Module                             
      Core                  Module                             
      Distributed 1.101 MiB Module                             
      Main                  Module                             
      g             0 bytes #7 (generic function with 1 method)
      x             8 bytes Int64                              
    
    

    Further questions concerning global variables:

    • Is there any difference in behavior with respect to this in the various interfaces in Distributed.jl (remotecall, @spawnat, @distributed, pmap, etc.)?
    • The manual claims that Main is treated specially, but I haven’t actually been able to verify this. In what ways is the behavior different in modules other than Main?
  • Arguments passed to remotely-called functions: I assume these are always just serialized, sent to the remote process, and passed to the function there (without defining any globals)?

  • Closures: Any closed-over variables are presumably included in the serialization process and thus sent just like function arguments?

So, do I generally have the right idea here? Any answers to the sub-questions above would be appreciated as well!

3 Likes

Regarding the question of why “named” functions behave differently from anonymous functions for serialization, I just came across this post in a different thread: