As @juliohm pinged me in [ANN] FileTrees.jl -- easy everyday parallelism on trees of files, I thought I’d give an example of how this could be done with FileTrees.jl
# SETUP FOR ALL PROCESSES
# -----------------------
@everywhere begin
# instantiate environment
using Pkg; Pkg.instantiate()
# load dependencies
using CSV, FileTrees, DataFrames
function process(infile)
# read file from disk
csv = CSV.read(infile)
# perform calculations
sleep(60) # pretend it takes time
csv.new = rand(size(csv,1))
end
end
# MAIN SCRIPT
# -----------
# relevant directories
indir = joinpath(@__DIR__,"data")
outdir = joinpath(@__DIR__,"results")
# files to process
infiles = FileTree(indir)
new_dfs = FileTrees.load(infiles, lazy=true) do f
try
process(path(f))
catch e
@warn "failed to process $(infiles[i])"
DataFrame() # store an empty file?
end
end
outfiles = rename(indir, outdir)
FileTrees.save(outfiles) do file
# save new file to disk
CSV.write(outfile, csv)
end # computation actually happens here.
This will launch the tasks using the Dagger scheduler. It will use both Threads and Distributed procs. If Julia is started from an env with JULIA_NUM_THREADS=N
.
You can apply mapvalues
on new_dfs
, and it will be a lazy operation and will not bring the data to the master process. You can also use reducevalues
to reduce an iterator of all the DataFrames.
This same code will work if the files are in a nested directory structure, besides, you get to filter files with glob""
syntax.
So it’s useful if you have a cluster but also want to use multithreading on each node in the cluster to speed up communication.
@juliohm I don’t think I have anything more to add about Distributed than that which is already mentioned on this thread though!