Parallelization issues

Hi all,

I am having parallelization issues. I am trying to utilize multiple computers in an mpi call and have tried a lot of things. So because it is mpi, the memory will not be shared. I can divide my dataset with an identifier, so that I essentially have a list of dts.

if ismissing(demo_var[1])
  list_of_dts = split(dt, :market_ids)
else
  list_of_dts = [begin
      date = first(unique(dt[dt.market_ids .== id, :quarter]))
      dma = first(unique(dt[dt.market_ids .== id, :dma_code]))
      (dt = dt[dt.market_ids .== id, :], demo_dt = demo_dt[(demo_dt.quarter .== date) .& (demo_dt.dma_code .== dma), :])
  end for id in unique(dt.market_ids)]
end

so then, list_of_dts[1][:dt] and list_of_dts[1][:demo_dt] give me the relevant data for the market 1.

after creating workers and exporting libraries & functions

using Distributed
addprocs (279)
@everywhere using Pkg
@everywhere Pkg.activate("/restricted/projectnb/econdept/jvonditf/storebrand/main/code/estimation/barg/Project")
@everywhere Pkg.instantiate()
@everywhere using CSV, QuantEcon, Logging, DataFrames, RecursiveArrayTools, Random, Statistics, Distributions, StatsBase, LinearAlgebra, LoopVectorization, FastHalton

@everywhere include("./src/functions-helper-par-generalized.jl")

I want to parallelize my calculation

result = pmap((market_ids_unq)  -> replacement_threats_m(market_ids_unq,list_of_dts[market_ids_unq][:dt],list_of_dts[market_ids_unq][:demo_dt],beta, sigma, nu_demo, nu_cols, fixed_coeff, random_coeff, all_coeff, demo_coeff, demo_var, xi_no_draws, ownership_id), 
    market_ids_unq)

My hope is, that by using this, the data gets exported to the workers and that the data shared is limited to the subset of the list.

My problem is that this all runs fine, when I just use 1 computer with 28, or even 2 computers with 56 cores. Whenever I try to do this with 10 computers with 280 cores, it breaks and doesn’t tell me why.

I have also tried calling it in different ways:

    worker_ids = workers()
    # Calculate the number of workers
    num_workers = length(worker_ids)
    result = [try
                @spawnat worker_ids[(index - 1) % num_workers + 1] begin 
                    market_data = list_of_dts[index]
                    println("Processing market $index on worker $(myid())")
                    start_time = time()
                    start_mem = Base.summarysize(market_data)
                    
                    result = replacement_threats_m(index, 
                                market_data[:dt], 
                                market_data[:demo_dt], 
                                beta, sigma, nu_demo, nu_cols, fixed_coeff, random_coeff, all_coeff, demo_coeff, demo_var, xi_no_draws, ownership_id)
                    end_time = time()
                    end_mem = Base.summarysize(result)
                    @info "Market $index processed on worker $(myid()) in $(end_time - start_time) seconds"
                    @info "Memory used for market $index: $(end_mem - start_mem) bytes"
                    flush(stdout)
                    GC.gc()                                                                                 
                    result
                end
            catch e
                println("Error processing market $index on worker $(worker_ids[(index - 1) % num_workers + 1]): $e")
                nothing # Return nothing or a suitable default value
            end
            for index in 1:length(market_ids_unq)]

Getting a reproducible example sounds impossible, as i do not know how the cluster is set up etc…

Does anybody have an idea, how I could do what I want to do using an mpi job?

This doesn’t really answer your question, but I don’t see anything related to MPI in your code, you seem to be using the Distributed stdlib, which is a different thing.

You may want to elaborate what this sentence means.

Hi, Yes, you are right. I am calling the following (sun grid engine on cluster)


# Request 8 nodes with 28 core each
#$ -pe mpi_28_tasks_per_node 224

and hence I called it mpi. Therefore, my confusion about this terminology and my usage of Distributed may be the reason for my problems.

When I schedule this and use Distributed, it kills the jobs essentially and does not log the reason for killing the job.

OK, to clarify this bit, that’s a SGE-specific (that name for the parallel environment may even be specific to your facility) way to ask for multiple nodes, under the assumption everybody uses MPI as a communication protocol, which isn’t necessarily the case. But that’s just a name, that’s it.

Ok. So I changed my setup a bit, so it may be clearer. I now add workers

using Clustermanager
addprocs_sge(279, t ="90:05:00"; qsub_flags=`-l mem_per_core=8G`, topology = :master_worker)

I don’t need workers to communicate with each other, I am using a pmap which is 100% parallel, hence :master_worker. I just got the error

Worker 80 terminated.Unhandled Task ERROR: EOFError: read end of file

which I have read online might indicate memory overflow. How can I test this?

I tried the following. Pmap goes over a function, that has a loop inside. I put in

        free_memory_gb = Sys.free_memory()/1024/1024/1024 #gigabytes
        print("Run: $i ; Free Mem: $free_memory_gb ;;")

to study, if the free memory decreases with the number of loops it runs. After 100+ runs, this doesn’t seem to be the problem. After running the loop, it should essentially just save the result. I am testing this not on a worker, but on the main process, so this is a caveat to the test.