Using a closure with mutable captured values inside an @distributed loop

The Julia manual says the following about the @distributed macro:

Any variables used inside the parallel loop will be copied and broadcast to each process.

My question is: Does this also apply to functions and closures? Are closures literally copied? When I naively try to copy a function in the REPL, it doesn’t work:

julia> copy(length)
ERROR: MethodError: no method matching copy(::typeof(length))

Here’s a minimal example that illustrates my problem. First I create a memoized function by creating a closure that captures a dictionary. And then I use the memoized function inside an @distributed loop. However, I’m not sure what happens with the captured dictionary when the memoized function is sent to the other processes. Does the captured dictionary get copied separately on each process and thenceforth the dictionaries on each process are independent?

using Distributed
addprocs(4)

function memoize(f)
    cache = Dict{Int, Int}()

    function mem_f(x)
        get!(cache, x) do
            f(x)
        end
    end
end

function run_distributed(f)
    @distributed (+) for _ in 1:100
        x = rand(1:10)
        f(x)
    end
end

f(x) = 2x
mem_f = memoize(f)
run_distributed(mem_f)

EDIT: I haven’t implemented this yet. When I run the above example, I actually get an error:

Error message
julia> run_distributed(mem_f)
ERROR: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:322 [inlined]
 [2] fetch
   @ ./task.jl:337 [inlined]
 [3] preduce(reducer::Function, f::Function, R::UnitRange{Int64})
   @ Distributed /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:278
 [4] run_distributed(f::var"#mem_f#2"{typeof(f), Dict{Int64, Int64}})
   @ Main ./REPL[4]:2
 [5] top-level scope
   @ REPL[7]:1

    nested task error: On worker 2:
    UndefVarError: #mem_f#2 not defined
    Stacktrace:
      [1] deserialize_datatype
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:1288
      [2] handle_deserialize
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:835
      [3] deserialize
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:782
      [4] deserialize_datatype
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:1312
      [5] handle_deserialize
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:837
      [6] deserialize
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:782
      [7] handle_deserialize
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:842
      [8] deserialize
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:782 [inlined]
      [9] deserialize_msg
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/messages.jl:87
     [10] #invokelatest#2
        @ ./essentials.jl:708 [inlined]
     [11] invokelatest
        @ ./essentials.jl:706 [inlined]
     [12] message_handler_loop
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:169
     [13] process_tcp_streams
        @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:126
     [14] #99
        @ ./task.jl:411
    Stacktrace:
     [1] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
       @ Distributed /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:394
     [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any, N} where N)
       @ Distributed /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:386
     [3] remotecall_fetch(::Function, ::Int64, ::Function, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
       @ Distributed /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421
     [4] remotecall_fetch
       @ /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
     [5] (::Distributed.var"#157#158"{typeof(+), var"#4#5"{var"#mem_f#2"{typeof(f), Dict{Int64, Int64}}}, UnitRange{Int64}, Vector{UnitRange{Int64}}, Int64, Int64})()
       @ Distributed /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:274

EDIT #2: I suspect that to make this example work, I would need to put everything except the call to run_distributed inside an @everywhere begin ... end block. And I believe the result of that would be that each process would have its own version of mem_f with independent caches.

Yes, I believe so too.

My intuition of distributed (which might need refinement from those more knowledgeble) is that does roughly what starting multiple julia sessions in multiple shells does plus some functionality for running something in all/some of the sessions (e.g @everywhere) and for sending data from one session to another (e.g remotecall_fetch).

As such, trying to have shared state between the processes might be a bit cumbersome.

1 Like

Makes sense. Thanks!