Looping over a thousand files and checking if empty

Hello!

I have some results of an experiment. They are stored inside a folder and I need to open the files, check what is the fraction of missing values inside a file and tag it according to different level of missing values (NaNs values actually). I’d like to make it parallel or at least speed up the code but so far I have not succeeded, any idea would be welcomed!

The number of file is about 2000 files where each file has a size of 16GB.

Note that I use a cluster, so if I can parrallelize, I could use a good number of CPU (using a Slurm scheduler).

Here’s a MWE (I hope!).

Project.toml content.

[179af706] CFTime v0.1.2
[336ed68f] CSV v0.10.11
[13f3f980] CairoMakie v0.10.8
[35d6a980] ColorSchemes v3.23.0
[3da002f7] ColorTypes v0.11.4
[5ae59095] Colors v0.12.10
[0703355e] DimensionalData v0.24.13
[61d90e0f] GeoJSON v0.7.2
[db073c08] GeoMakie v0.5.1
[c27321d9] Glob v1.3.1
[ee78f7c6] Makie v0.19.8
[85f8d34a] NCDatasets v0.12.17
[30363a11] NetCDF v0.11.7
[91a5bcdd] Plots v1.39.0
[438e738f] PyCall v1.96.1
[ac1d9e8a] ThreadsX v0.1.11
[5c2747f8] URIs v1.5.0
[90b8fcef] YAXArrayBase v0.6.1
[c21b50f5] YAXArrays v0.5.0
[0a941bbe] Zarr v0.9.1
using YAXArrays, NetCDF
using Downloads
using Glob
using Dates
using CFTime

# MWE. Sample files for creating the MWE. 
url = "https://esgf-world.s3.amazonaws.com/CMIP6/CMIP/CCCma/CanESM5/historical/r15i1p1f1/day/tasmin/gn/v20190306/tasmin_day_CanESM5_historical_r15i1p1f1_gn_18500101-20141231.nc"
filename = Downloads.download(url, "tasmin_day_CanESM5_historical_r15i1p1f1_gn_18500101-20141231.nc") # you pick your own path

# MWE hack. We copy the same file to mimic the loop over multiple files
cp(filename, "tasmin_day_CanESM5_historical_r15i1p1f1_gn_18500101-20141231_v2.nc")

# ==================================
# Beginning here, this is the actual MWE to optimize
files = glob("tasmin*.nc")

# To store filenames given different level of missing values
miss_frac = Vector{String}()
miss_over90 = Vector{String}()
miss_tot = Vector{String}()
miss_ok = Vector{String}()

for ifile in files

    ds = Cube(ifile)

    # MWE hack. This bit is just to optimize the code over a subset of the data
    # (the actual code will use the whole dataset)
    ds = ds[Time=DateTimeNoLeap(2000,1,1)..DateTimeNoLeap(2002,12,31)]

    # Total items
    tot_items = prod(size(ds))

    # Missing items
    # This is where it takes time. I was thinking that this would be parrallelized given that I provide the following ENV variable (but the process only use 1 core in the end) 
   # export JULIA_NUM_THREADS=8 (defined before I launch the script)
    missing_items = sum(map(x-> isnan(x), ds))

    # Fraction
    fraction = (missing_items / tot_items)

    if fraction > 0.5 && fraction < 0.9
        println("Found > 0.5 and < 0.9")
        push!(miss_frac, ifile)
    elseif fraction >= 0.9 && fraction < 1.0
        println("Found >= 0.9 and < 1.0")
        push!(miss_over90, ifile)
    elseif fraction == 1.00
        println("Found == 1.0!")
        push!(miss_tot, ifile)
    else
        push(miss_ok, ifile)
    end
end

@Fliks (perhaps you have an idea?)

Giving Julia more threads does not automatically parallelize your code. You have to call parallel functions, or write your own multi-threaded code. For example, ThreadsX.jl has a parallelized version of sum that you can call, which will utilize multiple threads.

Note also that you will often get better speedup by parallelizing at a coarser level than a finer level — in particular, I would think about parallelizing your outermost loop, the for ifile in files, with @threads. If you do that, in order to avoid race conditions you will want to put a mutex lock around your if fraction [...] end condition at the end, so that only one thread calls push! at a time.

PS. Better to call sum(isnan, ds) or count(isnan, ds) rather than sum(map(x-> isnan(x), ds)), since the latter will first allocate an array of isnan values and then sum it, whereas the former will allocate no intermediate arrays and make a single pass over the data.

3 Likes

Thanks!

I was under the impression that using @threads macro for the files would force me to create an array for ds, tot_items, missing_items and fraction, like:


# Code preallocating Arrays (ds, tot_items, etc) not shown

Threads.@threads for z in 1:length(files)

    ifile = files[z]
    
    ds[z] = Cube(ifile)
    
    tot_items[z] = prod(size(ds[z]))
    
    # Missing items
    missing_items[z] = sum(map(x-> isnan(x), ds[z]))
    
    # Fraction
    fraction[z] = (missing_items[z] / tot_items[z])
end

I will try your suggestion!

On a cluster think of using an Array Job . Your Julia code could count the NaNs in a given file. If you can run this code independently on each file then the array job can help.

ps. If it was possible to use grep to match NaNs I would suggest using grep…

Slurm Workload Manager - Job Array Support (schedmd.com)

2 Likes

Thanks!

Yes, I could indeed use a job array for the loop of the files. My hope was to distribute the file using a job array (or @threads loop) and then using multiple threads for the NaNs computations.

What kinds of files are these? Plaintext or some binary format? If plaintext, depending on your exact goals, you might also consider just using a shell script with some combination of wc, grep, sed, awk, etc.

1 Like

Indeed, I should have mentioned that those are netcdf files, which limits the access to the data to some librairies (here using YAXArrays, using NetCDF under-the-hood).

No, local variables within the loop body are already separate for each thread.

1 Like

These are exactly my thoughts. Utilize various Linux utilities to do various part of this.

Yeah, I guess I could use cdo or nco and bash scripting, but I must admit I do not use shell scripting enough to do it quickly :slight_smile:

I’m no allocations expert, but you might see a minor speed improvement here by replace this sum(map(isnan,ds[z]) with count(isnan, ds[z]).

I have not see major difference in allocations between count(isnan,ds), sum(isnan,ds) and sum(map(x-> isnan(x), ds)) after some testing. My guess is that the map function for an YAXArray is lazy by design? I could be wrong of course.

" Apply a function on every element of a datacube¤

The map function can be used to apply a function on every entry of a YAXArray without taking the dimensions into account. This will lazily register the mapped function which is applied when the YAXArray is either accessed or when more involved computations are made."

I think, that threading is not your main bottleneck. Your main bottleneck is that your code is running into a generic definition of sum which assumes fast random access to the data.
When you replace your missing_items computation by

    @time missing_items_ts = mapCube(countnan, ds, indims=InDims("lon", "lat"), outdims=OutDims(), nthreads=1)

This computation takes the chunking of the underlying data into account and has to open every chunk only once.
This is not going to be multithreaded, but it runs in half a second for the small dataset and in 14 seconds for the full examplary dataset for me while your code took 6 seconds for the small subset. You could then parallelize the computations via slurm along the number of files.
Threading is not going to help much, because your problem is mainly IO limited.

For your full problem what is the chunking structure of the data?
If a full lat lon chunk does not fit into memory you might have to do the computation in three steps along the longest chunking size. Or you could increase the max_cache for the mapCube function to allow for more data to be loaded at the same time.

3 Likes

Thanks!

Did you defined countnan somewhere?

Yes, sorry I forgot to include it here.

countnan(xout, xin) = xout .= count(isnan, xin)

This definition is needed so that YAXArrays is doing that computation in place and is going to reuse the temporary array for every time step.

1 Like

Awesome, thank you!

This is indeed 3x faster :slight_smile:

1 Like