I have some results of an experiment. They are stored inside a folder and I need to open the files, check what is the fraction of missing values inside a file and tag it according to different level of missing values (NaNs values actually). I’d like to make it parallel or at least speed up the code but so far I have not succeeded, any idea would be welcomed!
The number of file is about 2000 files where each file has a size of 16GB.
Note that I use a cluster, so if I can parrallelize, I could use a good number of CPU (using a Slurm scheduler).
using YAXArrays, NetCDF
using Downloads
using Glob
using Dates
using CFTime
# MWE. Sample files for creating the MWE.
url = "https://esgf-world.s3.amazonaws.com/CMIP6/CMIP/CCCma/CanESM5/historical/r15i1p1f1/day/tasmin/gn/v20190306/tasmin_day_CanESM5_historical_r15i1p1f1_gn_18500101-20141231.nc"
filename = Downloads.download(url, "tasmin_day_CanESM5_historical_r15i1p1f1_gn_18500101-20141231.nc") # you pick your own path
# MWE hack. We copy the same file to mimic the loop over multiple files
cp(filename, "tasmin_day_CanESM5_historical_r15i1p1f1_gn_18500101-20141231_v2.nc")
# ==================================
# Beginning here, this is the actual MWE to optimize
files = glob("tasmin*.nc")
# To store filenames given different level of missing values
miss_frac = Vector{String}()
miss_over90 = Vector{String}()
miss_tot = Vector{String}()
miss_ok = Vector{String}()
for ifile in files
ds = Cube(ifile)
# MWE hack. This bit is just to optimize the code over a subset of the data
# (the actual code will use the whole dataset)
ds = ds[Time=DateTimeNoLeap(2000,1,1)..DateTimeNoLeap(2002,12,31)]
# Total items
tot_items = prod(size(ds))
# Missing items
# This is where it takes time. I was thinking that this would be parrallelized given that I provide the following ENV variable (but the process only use 1 core in the end)
# export JULIA_NUM_THREADS=8 (defined before I launch the script)
missing_items = sum(map(x-> isnan(x), ds))
# Fraction
fraction = (missing_items / tot_items)
if fraction > 0.5 && fraction < 0.9
println("Found > 0.5 and < 0.9")
push!(miss_frac, ifile)
elseif fraction >= 0.9 && fraction < 1.0
println("Found >= 0.9 and < 1.0")
push!(miss_over90, ifile)
elseif fraction == 1.00
println("Found == 1.0!")
push!(miss_tot, ifile)
else
push(miss_ok, ifile)
end
end
Giving Julia more threads does not automatically parallelize your code. You have to call parallel functions, or write your own multi-threaded code. For example, ThreadsX.jl has a parallelized version of sum that you can call, which will utilize multiple threads.
Note also that you will often get better speedup by parallelizing at a coarser level than a finer level — in particular, I would think about parallelizing your outermost loop, the for ifile in files, with @threads. If you do that, in order to avoid race conditions you will want to put a mutex lock around your if fraction [...] end condition at the end, so that only one thread calls push! at a time.
PS. Better to call sum(isnan, ds) or count(isnan, ds) rather than sum(map(x-> isnan(x), ds)), since the latter will first allocate an array of isnan values and then sum it, whereas the former will allocate no intermediate arrays and make a single pass over the data.
I was under the impression that using @threads macro for the files would force me to create an array for ds, tot_items, missing_items and fraction, like:
# Code preallocating Arrays (ds, tot_items, etc) not shown
Threads.@threads for z in 1:length(files)
ifile = files[z]
ds[z] = Cube(ifile)
tot_items[z] = prod(size(ds[z]))
# Missing items
missing_items[z] = sum(map(x-> isnan(x), ds[z]))
# Fraction
fraction[z] = (missing_items[z] / tot_items[z])
end
On a cluster think of using an Array Job . Your Julia code could count the NaNs in a given file. If you can run this code independently on each file then the array job can help.
ps. If it was possible to use grep to match NaNs I would suggest using grep…
Yes, I could indeed use a job array for the loop of the files. My hope was to distribute the file using a job array (or @threads loop) and then using multiple threads for the NaNs computations.
What kinds of files are these? Plaintext or some binary format? If plaintext, depending on your exact goals, you might also consider just using a shell script with some combination of wc, grep, sed, awk, etc.
Indeed, I should have mentioned that those are netcdf files, which limits the access to the data to some librairies (here using YAXArrays, using NetCDF under-the-hood).
I have not see major difference in allocations between count(isnan,ds), sum(isnan,ds) and sum(map(x-> isnan(x), ds)) after some testing. My guess is that the map function for an YAXArray is lazy by design? I could be wrong of course.
" Apply a function on every element of a datacube¤
The map function can be used to apply a function on every entry of a YAXArray without taking the dimensions into account. This will lazily register the mapped function which is applied when the YAXArray is either accessed or when more involved computations are made."
I think, that threading is not your main bottleneck. Your main bottleneck is that your code is running into a generic definition of sum which assumes fast random access to the data.
When you replace your missing_items computation by
This computation takes the chunking of the underlying data into account and has to open every chunk only once.
This is not going to be multithreaded, but it runs in half a second for the small dataset and in 14 seconds for the full examplary dataset for me while your code took 6 seconds for the small subset. You could then parallelize the computations via slurm along the number of files.
Threading is not going to help much, because your problem is mainly IO limited.
For your full problem what is the chunking structure of the data?
If a full lat lon chunk does not fit into memory you might have to do the computation in three steps along the longest chunking size. Or you could increase the max_cache for the mapCube function to allow for more data to be loaded at the same time.