What's under the hood of Julia's remote call procedure?

As I’ve been using Julia’s Distributed.jl for my master’s thesis (which has been really easy to use; thanks!), I was interested to learn more about distributed computing in general. I’ve just gone down the rabbit hole of perusing different methods of inter-process communication (IPC), mainly in the vein of Message Passing (MP) and Remote Procedure Calls (RPC) in the context of high-performance computing (HPC). So, naturally, I went to the Julia docs to read more on the implementation details of Distributed.jl, but then I realized that the docs only cover the API usage and not the internal details. I tried looking at the source code (yay open source!), but I rapidly reached cognitive load. So, especially since Distributed.jl is not based on MPI (hence the existence of MPI.jl), what’s going on under the hood when I call @fetchfrom 2 myid()?

Additionally, I also came across the Mercury RPC, which is specifically designed for HPC (e.g. it explicitly does not target TCP). Is this something that could be useful in Julia?

I also didn’t know precisely, so let me take on my journey of figuring it out :slight_smile:
It is probably helpful to look at what this macro expands to to have a starting point:

julia> @macroexpand @fetchfrom 2 myid()
:(Distributed.remotecall_fetch((()->begin
              #= /cache/build/builder-amdci5-5/julialang/julia-release-1-dot-11/usr/share/julia/stdlib/v1.11/Distributed/src/macros.jl:145 =#
              myid()
          end), 2))

Ok so looking at the code of Distributed.remotecall_fetch (e.g. via @edit Distributed.remotecall_fetch((()-> myid()), 2)) we quickly find remotecall_fetch which calls

send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs))

and a couple of calls deeper we end up at send_msg_:

function send_msg_(w::Worker, header, msg, now::Bool)
    # some setup
    io = w.w_stream
    lock(io)
    try
        reset_state(w.w_serializer)
        serialize_hdr_raw(io, header)
        invokelatest(serialize_msg, w.w_serializer, msg)  # io is wrapped in w_serializer
        write(io, MSG_BOUNDARY)
    # some cleanup
end

Ok so apparently, we just serialize some message and write it to some stream.
We can have a brief look at the Messages (which we find in the file messages.jl in which also the send_msg function is located). However I don’t find that particularly interesting apart from a comment that explains the wire format:

## Wire format description
#
# Each message has three parts, which are written in order to the worker's stream.
#  1) A header of type MsgHeader is serialized to the stream (via `serialize`).
#  2) A message of type AbstractMsg is then serialized.
#  3) Finally, a fixed boundary of 10 bytes is written.

# Message header stored separately from body to be able to send back errors if
# a deserialization error occurs when reading the message body.

We can keep looking (for example where Workers are constructed, but I think we have the basics already: Distributed.jl uses a custom protocol that is based on serializing/deserializing messages and writing them to some form tunnel. What kind of tunnel probably differs depending on where the worker is located (so network probably uses some kind of network socket which I wouldn’t expect for workers on the same the physical machine).

Searching for places where workers are created, we can find create_worker at cluster.jl. Here we learn that the connections are handled by some function called connect that operates on a manager. It’s docstring partially confirms our hypothesis:

"""
    connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Implemented by cluster managers using custom transports. It should establish a logical
connection to worker with id `pid`, specified by `config` and return a pair of `IO`
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
ensure that messages are delivered and received completely and in order.
`connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
workers.
"""

I’ll leave it at this but if you want to look further then I’d recommend looking at ClusterManager and its subtypes. You’ll find that there is indeed a LocalManager that seems to use pipe instead of network sockets for communication.

If you want to understand this in more detail and still feel lost, feel free to ask more questions :slight_smile:

4 Likes

Amazing! I will definitely check out the details of ClusterManager! So it seems like it really is the simplest form of “collect all these things in a box, send the box over there, open the box and do what’s inside”?