Pmap() data inputs

Hi all, I am confused about pmap.

I do export all packages to all cores using @everywhere, I also export all functions using @everywhere.

But then, I try to use pmap where some of the inputs is a dataset. Do I need to export the dataset before pmap as well or not?

    var_profit = pmap(markets -> retailer_var_profit_m(markets,dt,nu_alpha,nu_p,nu_xj,nu_xm,nu_xr,marketsize,xi_no_draws), markets)

here, everything from dt to xi_no_draws are things specified. I need them to be on all cores that run. Somehow, it is breaking however. Do I need to export these objects manually, e.g. with @everywhere, before running pmap?

When you are using Distributed, the memory on each worker is isolated and all data required on a worker must be communicated to it. If you are on a single machine and your code isn’t allocating excessively, I would recommend using ThreadsX.map instead, or even manually using Threads.@threads on a for loop so you don’t have to worry about communication.

If you want to stick with pmap you could do something like:

using Base.Iterators
results = pmap((a,b)->myfunc(a,b), as, Iterators.repeat(b, length(as))
1 Like

I need to stick with pmap, as I allocate a lot and these are independent tasks for each market, so I don’t need to communicate data during the computation.

Can you explain your code that you wrote? How is the data communicated to all nodes? What does iterators do?

You may not need to communicate during the computation, but each worker has separate, isolated memory. This is why you need @everywhere to define functions on the workers. In your example, you are creating an anonymous function in the main process, using global variables (defined in your main process presumably), which cannot be accessed by the worker processes. In short, all variables defined in your anonymous function, and any functions called, must exist on the workers as well. If they don’t you can use pmap itself to send the information to each of the workers.

In your example, it seems like the variable xi_no_draws doesn’t exist on the workers. You could use @everywhere to load it there, but you can also send it to the worker to avoid using global variables everywhere by passing it as an argument to pmap.

An example using your variable names:

var_profit = pmap((markets, no_draws) -> retailer_var_profit_m(markets,dt,nu_alpha,nu_p,nu_xj,nu_xm,nu_xr,marketsize,no_draws), markets, xi_no_draws)

If you get a dimension mismatch error, you can create an “array” that is the same size as markets, but just fill it with the same value (which I do lazily using Iterators.repeat to avoid creating the entire array):

using Base.Iterators
var_profit = pmap((markets, no_draws) -> retailer_var_profit_m(markets,dt,nu_alpha,nu_p,nu_xj,nu_xm,nu_xr,marketsize,no_draws), markets, Iterators.repeat(xi_no_draws, length(markets)))

In essence, the arguments passed to the right of pmap are sent to each of the workers (each worker gets a single element from the arrays for the given job), which is a good way to make sure they have access to the data they need.

A way of writing your code to send all of the arguments would be:

using Base.Iterators
additional_args = (dt,nu_alpha,nu_p,nu_xj,nu_xm,nu_xr,marketsize,xi_no_draws)
complete_arg_list = Iterators.product(markets, [additional_args])
var_profit = pmap(complete_arg_list) do args
    market, extra_args = args # destructure to get the market and extra args
    retailer_var_profit_m(market, extra_args...)
end
# If you want to reshape to same size asa `markets`:
var_profit = reshape(var_profit, size(markets)...)

You can use collect(complete_arg_list) to see what the product line is doing.

Again, this is much simpler to do with multithreading as all “workers” (the threads) have access to the same memory and you don’t need to worry about using @everywhere, and it becomes a lot more ergonomic with the ThreadsX.jl package, not to mention the reduction in latency (and possibly increase in speed). The only major issue is that poor performing code with lots of allocations will probably be worse off with multithreading than multiprocessing (i.e. using Distributed).

1 Like

Thanks a ton for the explanation, I now follow everything!

I have sped up the performance critical code by a ton using tullio and LoopVectorization - parallel computation allows me to use more CPU power than multithreading. But will also discuss this again internally!

Thanks