Pmap over large dataset

Hi all,

I am trying to parallelize an operation over an index of a dataframe.

      list_of_dts = [begin
          date = first(unique(dt[dt.market_ids .== id, :quarter]))
          dma = first(unique(dt[dt.market_ids .== id, :dma_code]))
          (dt = dt[dt.market_ids .== id, :], demo_dt = demo_dt[(demo_dt.quarter .== date) .& (demo_dt.dma_code .== dma), :])
      end for id in unique(dt.market_ids)]

    delta_nfxp = pmap(index -> nfxp_market(index, list_of_dts, nu_demo, nu_cols, random_coeff, demo_coeff, demo_var), collect(1:length(unique(dt.market_ids))))
    delta_nfxp = vcat(delta_nfxp...)

In nfxp_market, it subsets list_of_dts by the index

        dt_mk = list_of_dts[index][:dt]
        demo_dt_mk = list_of_dts[index][:demo_dt]

I think the idea is clear. Then, when I actually run this, my workers keep getting killed. I believe this is an out of memory error. The workers are not all on the same node. For simplicity, let’s say I have 2 nodes with each 16 cores (i.e. 32-1 workers).

To reduce the memory, it could make sense to not export the entire dataset to each worker and then subset. Instead, for example in R, I can easily export just the subset meant for that worker. Is something like this possible in Julia?

I’m not sure whether you can do this via pmap, but slightly lower level control using @spawnat will probably work.

As a simple example, for Julia running with 4 processes on a single machine (julia -p 4)

x = rand(10^8, 4)
sum(pmap(i -> sum(@view x[:, i]), 1:4))

uses some 15 GB of memory, while

x = rand(10^8, 4)
fs = Array{Future}(undef, 4)
for i = 1:4
   xi = @view x[:, i]
   fs[i] = @spawnat :any sum(xi)
end
sum(fetch.(fs))

only uses 9 GB.

I don’t have experience with dataframes, but with matrice, what you are asking has the solution from @eldeee, but other approaches with even lower level control would work.

For instance, you could use DistributedArrays.jl to scatter your data, and then locally perform your operations, but I don’t know the compatibility with DataFrames. If I have to guess, you need to change your code to work with matrices.

I have not used, but you could have I try with GitHub - JuliaParallel/DTables.jl: Distributed table structures and data manipulation operations built on top of Dagger.jl

It is not clear for me what is the large dataset, is it list_of_dts?