Hi all,
Problem Background
I have been struggling a couple days to understand parallel and distributed computing on a remote cluster that runs SLURM.
I have 100s of huge files that I need to extract information from and write to binary files (Arrow files).
I have written code that works well on one single threaded process locally, but trying to distribute it has caused me significant trouble.
It would appear that some processes do in fact spawn and I do see files get analyzed and new files created as expected.
However, not all get analyzed and it doesn’t seem like all my workers are spawning as expected.
I have read multiple Discourse posts on the subject and am confused as to why my code is not working as expected.
Could anyone tell me what I am doing wrong and am not seeing the process play out as expected?
Thanks!
Code
using Distributed
using SlurmClusterManager
addprocs(SlurmManager())
@everywhere using Pkg
@everywhere Pkg.activate("ANALYSIS", shared = true)
@everywhere using Arrow
prefix = "path/to/prefix/"
special_dirs = readdir(prefix) |> f -> filter(!startswith("."), f)
output_path = "path/to/output/"
special_dir = "special"
prefix = joinpath(prefix, special_dir)
output_prefix = joinpath(output_path, special_dir)
files = readdir(prefix) |> f -> filter(x -> !startswith(".", x) && splitext(x)[2] == ".json", f)
@everywhere function arrow_write(file, prefix, output_prefix)
println("Worker writing to $file")
# Processing code....
println("Done with $(file_path)")
end
@sync @distributed for file in files
arrow_write(file, prefix, output_prefix)
end
Thank you so much all!
~ tcp
Small Update
I made some later adjustments and it seems like doing something like this makes my code work correctly:
@everywhere prefix = "path/to/prefix/"
@everywhere special_dirs = readdir(prefix) |> f -> filter(!startswith("."), f)
@everywhere output_path = "path/to/output/"
@everywhere special_dir = "special"
@everywhere prefix = joinpath(prefix, special_dir)
@everywhere output_prefix = joinpath(output_path, special_dir)
@everywhere files = readdir(prefix) |> f -> filter(x -> !startswith(".", x) && splitext(x)[2] == ".json", f)
I understand why this works as it makes it available to every running process.
But is there a more elegant way of doing that?