Hi all,
I am having parallelization issues. I am trying to utilize multiple computers in an mpi call and have tried a lot of things. So because it is mpi, the memory will not be shared. I can divide my dataset with an identifier, so that I essentially have a list of dts.
if ismissing(demo_var[1])
list_of_dts = split(dt, :market_ids)
else
list_of_dts = [begin
date = first(unique(dt[dt.market_ids .== id, :quarter]))
dma = first(unique(dt[dt.market_ids .== id, :dma_code]))
(dt = dt[dt.market_ids .== id, :], demo_dt = demo_dt[(demo_dt.quarter .== date) .& (demo_dt.dma_code .== dma), :])
end for id in unique(dt.market_ids)]
end
so then, list_of_dts[1][:dt] and list_of_dts[1][:demo_dt] give me the relevant data for the market 1.
after creating workers and exporting libraries & functions
using Distributed
addprocs (279)
@everywhere using Pkg
@everywhere Pkg.activate("/restricted/projectnb/econdept/jvonditf/storebrand/main/code/estimation/barg/Project")
@everywhere Pkg.instantiate()
@everywhere using CSV, QuantEcon, Logging, DataFrames, RecursiveArrayTools, Random, Statistics, Distributions, StatsBase, LinearAlgebra, LoopVectorization, FastHalton
@everywhere include("./src/functions-helper-par-generalized.jl")
I want to parallelize my calculation
result = pmap((market_ids_unq) -> replacement_threats_m(market_ids_unq,list_of_dts[market_ids_unq][:dt],list_of_dts[market_ids_unq][:demo_dt],beta, sigma, nu_demo, nu_cols, fixed_coeff, random_coeff, all_coeff, demo_coeff, demo_var, xi_no_draws, ownership_id),
market_ids_unq)
My hope is, that by using this, the data gets exported to the workers and that the data shared is limited to the subset of the list.
My problem is that this all runs fine, when I just use 1 computer with 28, or even 2 computers with 56 cores. Whenever I try to do this with 10 computers with 280 cores, it breaks and doesn’t tell me why.
I have also tried calling it in different ways:
worker_ids = workers()
# Calculate the number of workers
num_workers = length(worker_ids)
result = [try
@spawnat worker_ids[(index - 1) % num_workers + 1] begin
market_data = list_of_dts[index]
println("Processing market $index on worker $(myid())")
start_time = time()
start_mem = Base.summarysize(market_data)
result = replacement_threats_m(index,
market_data[:dt],
market_data[:demo_dt],
beta, sigma, nu_demo, nu_cols, fixed_coeff, random_coeff, all_coeff, demo_coeff, demo_var, xi_no_draws, ownership_id)
end_time = time()
end_mem = Base.summarysize(result)
@info "Market $index processed on worker $(myid()) in $(end_time - start_time) seconds"
@info "Memory used for market $index: $(end_mem - start_mem) bytes"
flush(stdout)
GC.gc()
result
end
catch e
println("Error processing market $index on worker $(worker_ids[(index - 1) % num_workers + 1]): $e")
nothing # Return nothing or a suitable default value
end
for index in 1:length(market_ids_unq)]
Getting a reproducible example sounds impossible, as i do not know how the cluster is set up etc…
Does anybody have an idea, how I could do what I want to do using an mpi job?