Struggling to Run Distributed I/O Operations on SLURM Cluster

Hi all,

Problem Background

I have been struggling a couple days to understand parallel and distributed computing on a remote cluster that runs SLURM.
I have 100s of huge files that I need to extract information from and write to binary files (Arrow files).
I have written code that works well on one single threaded process locally, but trying to distribute it has caused me significant trouble.

It would appear that some processes do in fact spawn and I do see files get analyzed and new files created as expected.
However, not all get analyzed and it doesn’t seem like all my workers are spawning as expected.
I have read multiple Discourse posts on the subject and am confused as to why my code is not working as expected.
Could anyone tell me what I am doing wrong and am not seeing the process play out as expected?
Thanks!

Code

using Distributed
using SlurmClusterManager

addprocs(SlurmManager())

@everywhere using Pkg
@everywhere Pkg.activate("ANALYSIS", shared = true)

@everywhere using Arrow

prefix = "path/to/prefix/"
special_dirs = readdir(prefix) |> f -> filter(!startswith("."), f)
output_path = "path/to/output/"

special_dir = "special"
prefix = joinpath(prefix, special_dir)
output_prefix = joinpath(output_path, special_dir)
files = readdir(prefix) |> f -> filter(x -> !startswith(".", x) && splitext(x)[2] == ".json", f)

@everywhere function arrow_write(file, prefix, output_prefix)
        println("Worker writing to $file")

	# Processing code....

        println("Done with $(file_path)")
end

@sync @distributed for file in files
        arrow_write(file, prefix, output_prefix)
end

Thank you so much all!

~ tcp :deciduous_tree:

Small Update

I made some later adjustments and it seems like doing something like this makes my code work correctly:

@everywhere prefix = "path/to/prefix/"
@everywhere special_dirs = readdir(prefix) |> f -> filter(!startswith("."), f)
@everywhere output_path = "path/to/output/"

@everywhere special_dir = "special"
@everywhere prefix = joinpath(prefix, special_dir)
@everywhere output_prefix = joinpath(output_path, special_dir)
@everywhere files = readdir(prefix) |> f -> filter(x -> !startswith(".", x) && splitext(x)[2] == ".json", f)

I understand why this works as it makes it available to every running process.
But is there a more elegant way of doing that?

I prefer splitting the distributed setup from the actual code. @everywhere is considered evil xD.

So as an example I often have a file called setup.jl that can be variously complex:

using Distributed
using ClusterManagers

# Usage:
# - Set `export JULIA_PROJECT=`pwd``

if haskey(ENV, "SLURM_JOB_ID")
  jobid = ENV["SLURM_JOB_ID"]  
  ntasks = parse(Int, ENV["SLURM_NTASKS"])
  cpus_per_task = parse(Int, ENV["SLURM_CPUS_PER_TASK"])
  @info "Running on Slurm cluster" jobid ntasks cpus_per_task
  manager = SlurmManager(ntasks)
else
  ntasks = 2
  cpus_per_task = div(Sys.CPU_THREADS, ntasks)
  @info "Running locally" ntasks
  manager = Distributed.LocalManager(ntasks, false)
end
flush(stderr)

# Launch workers
addprocs(manager; exeflags = ["-t $cpus_per_task"])

@everywhere begin
  import Dates
  using Logging, LoggingExtras
  const date_format = "HH:MM:SS"

  function dagger_logger(logger)
    logger = MinLevelLogger(logger, Logging.Info)
    logger = TransformerLogger(logger) do log
      merge(log, (; message = "$(Dates.format(Dates.now(), date_format)) ($(myid())) $(log.message)"))
    end
    return logger
  end

  # set the global logger
  if !(stderr isa IOStream)
    ConsoleLogger(stderr)
  else
    FileLogger(stderr, always_flush=true)
  end |> global_logger
end

@everywhere begin
  if myid() != 1
    @info "Worker started" Base.Threads.nthreads()
  end
  sysimg = unsafe_string((Base.JLOptions()).image_file)
  project = Base.active_project()
  @info "Environment" sysimg project
end

# Load code to execute on all processes
@everywhere begin
     include("code.jl")
end

code.jl then contains the actual code definitions.

And then I have a driver.jl which is the code to execute on the primary to manage the computation.

I then have a slurm script:

#!/bin/bash
# Begin SLURM Directives
#SBATCH --job-name=Example
#SBATCH --time=1:00:00
#SBATCH --mem=0
#SBATCH --ntasks-per-node=4
#SBATCH --cpus-per-task=16

# Clear the environment from any previously loaded modules
module purge > /dev/null 2>&1

module load julia

export JULIA_PROJECT=`pwd`

HOSTNAME=$(hostname)
echo "Primary runs on ${HOSTNAME}"

julia -L setup.jl driver.jl 
1 Like