How to make a variable available on all processes (when using Distributed)

I have a workflow which first does some preparations. Next, it does a heavy bit of computing which I would like to carry out in parallel. In practise it looks something like:

# Fetch packages.
@everwhere using Distributed, MultiplePackages
addprocs(20)

# Preparations.
parallel_opt_problem = prepare_problem()

# Solve the problem (previously had a `@everwhere `, that was an error as this is handled automatically by the `solve` function I am importing).
solve(parallel_opt_problem) # This bit can be distributed over multiple processes).

here I run into the problem that parallel_opt_problem is not available on all processes, so I instead do:

@everwhere parallel_opt_problem = prepare_problem()

This works but have a few issues:

  • There is no advantage of parallelising this.
  • In practice, it contains lots of code statements (checking output as I go, selecting options, etc.). This means adding a lot of @everwhere across the code (or at least at the front of each of a couple of begin ... end blocks).
  • There are some randomisation etc. in prepare_problem(). I am afraid that the parallel_opt_problem created on each process will actually not be entirely identical (I use StableRNGs and stuff, but and a bit sketchy exactly what will happen across all of this.

If possible, it seems absolutely easiest if I just could do something like

# Solve the problem.
@copy_everwhere parallel_opt_problem
solve(parallel_opt_problem) # This bit can be distributed over multiple processes).

to get parallel_opt_problem accessible on all processes.

There have been some issues on this (Copying variable to all remote processes?, Share an already defined variable to all processes), however, these are somewhat old, and the answers is not super complete, so would like to check here what makes sense in this day and age,

I’m not sure what is your aim, do you want to execute the same problem across more process? If that is the case you should do something like:

parallel_opt_problem = prepare_problem()

pmap(_ -> solve(problem), 1:N_executions)
1 Like

So the solve function is imported from Optimization.jl, and already have routines implemented for handling the parallelisation across all processes.

The problem is that when I do

@everwhere solve(parallel_opt_problem)

I get an error that parallel_opt_problem is only available on one of my processes. The question is if there is a way to make a variable you have declared available on all your processes. The alternative is to run all the previous code across all processes, which feels like a sub-optimal approach.

If solve internally parallelize across processes, then you should not call it with @everywhere. But what I think is more probable is that the parallelization is happening across threads and not across processes.

2 Likes

Sorry, you are right, the @everwhere is a mistake, it should just be

solve(parallel_opt_problem)

However, that still leaves me with the problem how to make parallel_opt_problem available on all processes.

I don’t think you have to, if solve makes an internal parallelization, then the dispatch across all workers will be done internally by solve, but I think solve does not work as you think it does, can you please report what algorithm are you using for the optimization? it would be even better if you can provide a non parallelized example and explain how you intend to parallelize it

2 Likes

I am using optimization.jl’s ensemble interface: Multistart optimization with EnsembleProblem · Optimization.jl, with the automated EnsembleDistributed() parallelisation option (Parallel Ensemble Simulations · DifferentialEquations.jl).

While it handles parallelisation, the input problem do need to be available on all workers. I.e. if I don’t do all the code generating it with @everywhere I get an error that the variable is not available on the second worker.

Now the documentation says that a component of parallel_opt_problem must be made available using @everywhere (Parallel Ensemble Simulations · DifferentialEquations.jl). However, this component is generated within a second package of mine, and created through internal code. So I figured if I could just make parallel_opt_problem available everywhere things would work (and indeed, this does work, but right now the process looks something like:

parallel_opt_problem_1 = prepare_problem_1()
parallel_opt_problem_2 = prepare_problem_2()parallel_opt_problem_1
parallel_opt_problem_3 = prepare_problem_3(parallel_opt_problem_2)
parallel_opt_problem = prepare_problem(parallel_opt_problem_3)

and I cannot just do

@everywhere parallel_opt_problem = prepare_problem(parallel_opt_problem_3)

as then parallel_opt_problem_3 is not available on all workers (essentially forcing it across the entire chain).

It looks like this is mroe suitable with what you want to do, since it uses threads and not processes, allowing to share the memory

1 Like

So, I do not actually want to share memory.

I have a single

parallel_opt_problem

It encodes a single optimization problems with n different starting conditions. I wish to solve each of these starting conditions in parallel, on n different processors. The problem is that I cannot just do

solve(parallel_opt_problem) # Automatically parallelises my problem across available processes.

as (part of) parallel_opt_problem must first be made available on all processes. I can do this and it works by adding multiple @everwhere statements across my code until the point I generate parallel_opt_problem, and this works find and as I would want.

However, for reasons described above, it would be nice if I could directly make parallel_opt_problem available across all processes. I.e. if there is a way to

  • Take a single variable (in my case parallel_opt_problem) which is available on one processor, and copy that information over to all other ones.

That would be very useful (and this is essentially the question I am asking)

Have you tried to interpolate the values?
So something like

@everywhere parallel_opt_problem = prepare_problem($parallel_opt_problem_3)

Is that what you intend to do?

EDIT

This should be better:

@everywhere parallel_opt_problem = $(prepare_problem(parallel_opt_problem_3))

In this way the function prepare_problem is first executed on the main worker, and then the returned value interpolated across all workers

2 Likes

Beat me to it! You could also just simply do @everywhere x = $x.

With more detail

using Distributed
addprocs(1)

x = 2
@everywhere try @show x; catch e println("there was an error") end
# x = 2
#      From worker 2:    there was an error

# first interpolate the value of x on the local host into the
# expression given to the @everywhere macro
# then @everywhere tells all processes to evaluate the expression "x = 2"
# in their own memory space
@everywhere x = $x
@everywhere @show x
# x = 2
#      From worker 2:    x = 2
2 Likes

Thanks both!

Unfortunately when I try either of those I simply get an error:

ERROR: On worker 2:
UndefRefError: access to undefined reference
Stacktrace:
  [1] getproperty
    @ ./Base.jl:43 [inlined]
  [2] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1564
  [3] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:908
  [4] deserialize_fillarray!
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1318
  [5] deserialize_array
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1310
  [6] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:895
  [7] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1585
  [8] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:908
  [9] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1585
 [10] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:908
 [11] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1585
 [12] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:908
 [13] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:844
 [14] deserialize_expr
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1370
 [15] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:924
 [16] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:844
 [17] deserialize_expr
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1370
 [18] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:924
 [19] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:844
 [20] #5
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1003
 [21] ntupleany
    @ ./ntuple.jl:43
 [22] deserialize_tuple
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:1003
 [23] handle_deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:887
 [24] deserialize
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Serialization/src/Serialization.jl:844 [inlined]
 [25] deserialize_msg
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Distributed/src/messages.jl:87
 [26] #invokelatest#2
    @ ./essentials.jl:1055 [inlined]
 [27] invokelatest
    @ ./essentials.jl:1052 [inlined]
 [28] message_handler_loop
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Distributed/src/process_messages.jl:176
 [29] process_tcp_streams
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Distributed/src/process_messages.jl:133
 [30] #103
    @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Distributed/src/process_messages.jl:121

...and 20 more exceptions.

Stacktrace:
 [1] sync_end(c::Channel{Any})
   @ Base ./task.jl:466
 [2] macro expansion
   @ ./task.jl:499 [inlined]
 [3] remotecall_eval(m::Module, procs::Vector{Int64}, ex::Expr)
   @ Distributed ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Distributed/src/macros.jl:219
 [4] macro expansion
   @ ~/.julia/juliaup/julia-1.11.5+0.x64.linux.gnu/share/julia/stdlib/v1.11/Distributed/src/macros.jl:203 [inlined]
 [5] top-level scope

I haven’t actually used Serialization, so am unsure why it got involved.

Serialization is used in under the hood with Distributed to package the information you’re sending to workers into nice little boxes that are then unpacked when they get to the worker.

What exactly did you try?

1 Like

I tried both

@everywhere parallel_opt_problem = $(prepare_problem(parallel_opt_problem_3))
@everywhere parallel_opt_problem = $(parallel_opt_problem)

It seems like there is something fishy with one of the inputs. I.e.
It looks something like

parallel_opt_problem = $(prepare_problem(input1, input2, input3))

And if I just try

@everywhere $input1 # fine
@everywhere $input2 # Yields the error.
@everywhere $input3 # fine

The error is thrown from t.types where t should be a DataType, is it possible that one of the types is not defined in all the workers?

t.types should return a simple vector with the types of the fields of the DataType

1 Like

Maybe, I have a @everywhere around my input block with all using imports, so I would think that should be fine.

I have managed to create a somewhat minimal error though. This function is not actually saved within the structure, but it is involved in the generation and trying this with it generates the same error:

error_func(std; rng = Random.default_rng()) = (x -> x + rand(rng, Normal(0.0, std)))
@everywhere $error_func

Maybe it is not that deep down the input contains a sols_true::Vector{ODESolution} from the OrdinaryDiffEq.jl package. running

@everywhere $sols_true

causes the error (also after explicitly running @everywhere using OrdinaryDiffEq).

I feel this issue is now distinctly different from my initial one, so for future reference I will mark you previous solution as the correct one (which it is).

Thanks a lot for the help both of you!

Just to make sure, I will echo

What is the reason you want to specifically use multi-processing and Distributed instead of local multithreading and Threads? Are you actually using multiple separate machines e.g. in a cluster? Or each process itself is multithreaded (not that Julia can’t handle nested multithreading)? In other words, you say

but why?

As @VinceNeede pointed out, the next section of that documentation points out that the problem can also be solved more straightforwardly using multithreading on a single process. The only difference between the two being

  • Multi-processing: solve(..., EnsembleDistributed(), ...)
  • Multi-theading: solve(..., EnsembleThreads(), ...)

which is not explicitly included in the solve(parallel_opt_problem) call, and neither are the other arguments to solve such as integration algorithm e.g. Tsit5(). So what are you actually using? Are there defaults for these arguments?

I’m not saying you’re doing something wrong, but if you can use multiple threads, then the point is moot because each thread will automatically know what parallel_opt_problem is without needing to fiddle with the details of @everywhere and passing data between processes.

2 Likes

No, there might be something in this. You are right that it is not that I do not want to share memory, I just do not need to. Now when I tried EnsembleSerial and EnsembleThreads they seem to give similar performance for a decent (10) number of parallel runs of decent length (e.g. the full thing takes 2 minutes). I figured that Multithreading (and I am by no means an expert on this kind of things) wasn’t able to achieve as good parallelisation as making it distributed (which seemed very parallel when I got it to work and a good speedup).

I am running everything interactively in VSCode on a machine I have ssh’ed into (I ran with 20 processes and with the muli threading 40 threads, so there is a lot of cores available).