I am not using Dask so I think this is only using one process, similar to the Julia attemps. If I understand correctly, I should ask explicitly for parallelism with Dask on the Python example (@lsterzinger am I correct in this?). So the resources should be similar except for the cases that allocate the full array (NCDatasets), those ones require much more memory. I think the YAXArrays is even design to behave similarly as xarray (you need to explicitly load Distributed for the graph to be done in parallel).
Thanks for the insight. NCDatasets has been my main tool for some years now when working with NetCDF files and I always assumed that the chunked reading was managed automatically by the netcdf library.
but they can still be doing multi-threading?
I am not using Dask so I think this is only using one process, similar to the Julia attemps. If I understand correctly, I should ask explicitly for parallelism with Dask on the Python example (@lsterzinger am I correct in this?).
If you supply chunking info with xr.open_dataset(..., chunks={...})
, the new dataset is loaded as dask arrays so any computation using that dataset will utilize dask to process the chunks. Without specifying chunks (e.g, just using xr.open_dataset()
) even if the files themselves are chunked, to my knowledge does not use dask arrays.
Yeah, netcdf handles the chunked reading. What we need to do (using DiskArrays.jl) is read those chunks and operate on them 1 at a time so we donāt have to load huge amounts of data into ram. This already works with e.g. ArchGDAL.jl.
DiskArrays.jl does broadcasts lazily, then applies them to each chunk. Reductions happen chunk by chunk instead of in column order.
I just took a closer look at your code, I donāt think that saving a chunked netCDF and then opening it with xarray will preserve the chunking scheme. I did a quick test creating a random array, chunking it with xarray, and saving to disk. Opening it back up with xarray (with no additional arguments) returned a single array with no chunks. So I think chunks need to be specified when the dataset is opened. There may be some performance under the hood if the netcdf is chunked on disk but I donāt think xarray specifically takes this into account.
The .compute()
forces xarray to compute instead of lazy loading. If the array is a dask array, then dask will be utilized to do the computation otherwise it will be done in serial
You can also try fusing the broadcasts with @.
to get a speed up. On my computer:
function get_max_sp_rasters(file)
vsfc = Raster(file, key="VSFC")
usfc = Raster(file, key="USFC")
sp = sqrt.(vsfc.^2 + usfc.^2)
return maximum(sp,dims=(X,Y))[:]
end
function get_max_sp_rasters_fused(file)
vsfc = Raster(file, key="VSFC")
usfc = Raster(file, key="USFC")
sp = @. sqrt(vsfc^2 + usfc^2)
return maximum(sp,dims=(X,Y))[:]
end
@time get_max_sp_rasters(file_chunked)
# 21.788317 seconds (19.27 M allocations: 19.805 GiB, 14.01% gc time, 33.40% compilation time)
@time f1(file_chunked)
# 8.955190 seconds (1.88 M allocations: 11.815 GiB, 6.41% gc time, 7.40% compilation time)
Half the time and far fewer allocations.
Thanks for taking a look!
Good catch!
With this, the memory footprint reduces and in the computer in question it becomes 17s instead of 25s. Updated the original post.
If I recall correctly, setting chunks="auto"
will load them as they are present in the file.
I think what hasnāt been mentioned yet would be that packages supported by DIskArrays do reductions by chunk by default. So the following is pretty fast for me:
using NetCDF
NetCDF.open("/home/fgans/Downloads/speeds_chunked.nc") do f
usfc = f["USFC"]
vsfc = f["VSFC"]
ws = sqrt.(usfc.^2 .+ vsfc.^2)
maximum(ws,dims=(1,2))
end
takes 16s on my machine and reads the data chunk by chunk and uses DiskArrayās lazy broadcast. The 16s seems to be the time that a single process needs to read the whole file and what many serial methods proposed here converge on. Note that internally this is based on this method: https://github.com/meggart/DiskArrays.jl/blob/39a2de3a2cc11e9932164a790bf28987777c3d6a/src/ops.jl#L85 which can be parallelized quite easily. This will work the same for the Zarr data, although I will have to investigate what makes Zarr so slow here, I guess the zarr array is compressed while the NetCDF isnāt? This makes me still wonder why it is so fast in pythonā¦
Now the implementation I linked is a simple loop, so can be parallelized easily, here I use ParallelMapReduce.jl and get the time down to 2.5 seconds using 8 cores on my laptop:
using Distributed, ParallelMapReduce
addprocs(8)
@everywhere import Pkg
@everywhere Pkg.activate(".")
@everywhere using NetCDF, DiskArrays, ParallelMapReduce
#Define the mapping function
@everywhere function chunkmax(cI)
a = NetCDF.open("/home/fgans/Downloads/speeds_chunked.nc") do f
usfc = f["USFC"]
vsfc = f["VSFC"]
ws = sqrt.(usfc.^2 .+ vsfc.^2)
ws[cI...]
end
o = fill(typemin(eltype(a)),2400)
o[last(cI)] = maximum(a,dims = (1,2))
o
end
#And the reduing function (here a elementwise max)
@everywhere redmax(o1,o2) = o1 .= max.(o1,o2)
#Get an iterable of the chunk indices
chunks = NetCDF.open(DiskArrays.eachchunk,"/home/fgans/Downloads/speeds_chunked.nc","USFC")
#And map over these indices
pmapreduce(chunkmax,redmax,map(identity,chunks))
Here I included the opening of the file and reading a chunk into the function that is distributed because sharing a NetCDF C pointer across processes is quite dangerous. It would be a nice excercise to generalise this a bit more and make a general pmapreducedim
for DiskArrays.
Regarding the relative slowness of YAXArrays, this is caused by the fact that the task is actually not a mapslices operation but rather a mapreduce operation, it is not necessary to have an entire (x,y) slice in memory to compute the maximum over that slice, so that a chunk-by-chunk data access pattern leads to more efficient runtimes.
If the array was chunk in (512,512,1) slices, there would be no penalty if you use mapslices, but in this case, chunks have to be accessed multiple times, which is what the warning about caches misses is about and leads to non-ideal data access.
I thought so too, but when I did my quick test (random 1000x1000 array, 100x100 chunks) and saved to file, I reopened with chunks='auto'
and the chunks were set to 1000x1000, so I think xarray tries to figure out an appropriate chunk size based on available memory, etc. Iām guessing if you increased everything by an order of magnitude (10000x10000 array, 1000x1000 chunks) it would be more likely to use the fileās chunking scheme
I like this one very much for its ease of writing (which is one of the strongest selling points of the xarray approach for me). You point out the convergence of all the methods toward 16s which is true. It looks like @jling question is relevant for the zarr case (looks like it is automatically using four threads to read the zarr).
With NCDatasets (and optionally DistributedArrays), this more compact code
using DistributedArrays
@everywhere begin
import NCDatasets as ncd
ds=ncd.Dataset("./speeds.nc")
get_max_sp_ncdatasets(t::Int) =
maximum(sqrt.(ds["USFC"][:,:,t].^2 .+ ds["VSFC"][:,:,t].^2), dims = (1,2))[1]
end
gives benchmarks as follows.
Serial case :
julia> @time get_max_sp_ncdatasets.(1:2400)
9.872756 seconds (299.85 k allocations: 7.044 GiB, 15.53% gc time)
Parallel case :
julia> @time get_max_sp_ncdatasets.(distribute(collect(1:2400)))
1.535838 seconds (2.61 k allocations: 190.047 KiB)
For bigger array dims 1&2, it shouldnāt be too complicated to āchunkā get_max_sp_ncdatasets
in space. But otherwise performances like these probably suffice for me.
A side note about chunking in space.
NCTiles.jl does this by generating a collection of smaller files in the first place. Thatās a simple solution I designed for distributing global ocean state estimates circa 2015 ( GMD - ECCO version 4: an integrated framework for non-linear inverse modeling and global ocean state estimation ).
You canāt expect that everyone is going to store large output this way but this has the advantage that rechunking on the reading side becomes a non-issue.
This is great, Gael. Thanks.
Could you tell more about the resources you used in your example?
Yes!
Iām marking this one as solved. I had a lot of fun. Thanks!
Gaelās proposal is quite good actually.
I think it is safe to say that we still donāt have the tooling as advanced as Python for doing this kind of tasks ultra fast almost without giving it any thought. Not that I am blaming it on the awesome package developers, just checking about the current state of our ecosystem. I typically preach Julia to my colleagues but they still have some strong points to stay where they are.
It was said earlier but then not taken into account in many propositions: do not forget to do sqrt(maximum(...))
and not maximum(sqrt.(...))
!
Right, I missed that. This will save many operations.