HDF5 collective read using MPI goes out of memory, but write works perfectly

I have a test script that uses parallel HDF5 for reading and writing. I am testing this on a node of our cluster with ample memory available, yet the read goes out of memory. Interestingly, the write allocates only ~200 MB. Am I doing something wrong here?

## User input.
npx = 8; npy = 16
itot = 1024; jtot = 1024; ktot = 1024

imax = itot ÷ npx; jmax = jtot ÷ npy


## Init MPI and create grid.
using MPI
using HDF5
using LoopVectorization

MPI.Init()

dims = [npy, npx]; periodic = [1, 1]; reorder = true
commxy = MPI.Cart_create(MPI.COMM_WORLD, dims, periodic, reorder)
commx = MPI.Cart_sub(commxy, [false, true])
commy = MPI.Cart_sub(commxy, [true, false])

info = MPI.Info()
id = MPI.Comm_rank(commxy)
id_x = MPI.Comm_rank(commx)
id_y = MPI.Comm_rank(commy)

print("$id: has parallel HDF $(HDF5.has_parallel())\n")

a = ones(imax, jmax, ktot) * id


function write_3d_hdf(a)
    fid = h5open("test_mpi.h5", "w", commxy, info)
    data_all = create_dataset(fid, "a", datatype(eltype(a)), dataspace((itot, jtot, ktot)), dxpl_mpio=HDF5.H5FD_MPIO_COLLECTIVE)
    is = id_x*imax + 1; ie = (id_x+1)*imax
    js = id_y*jmax + 1; je = (id_y+1)*jmax
    data_all[is:ie, js:je, :] = a[:, :, :]
    MPI.Barrier(commxy)
    close(fid)
end


function read_3d_hdf(a)
    is = id_x*imax + 1; ie = (id_x+1)*imax
    js = id_y*jmax + 1; je = (id_y+1)*jmax
    fid = h5open("test_mpi.h5", commxy, info)
    aid = read(fid, "a", dxpl_mpio=HDF5.H5FD_MPIO_COLLECTIVE)
    a[:, :, :] = aid[is:ie, js:je, :]
    MPI.Barrier(commxy)
    close(fid)
end

for i in 1:3
    @time write_3d_hdf(a)
end

for i in 1:3
    @time read_3d_hdf(a)
end

MPI.Finalize()

I failed to reproduce the issue by running your script on a local machine. Maybe it’s a problem of the parallel HDF5 configuration on your cluster?

What I can suggest is to try out my PencilArrays.jl package, which includes high-level wrappers for parallel HDF5 as well as raw MPI-IO. By comparing both kinds of parallel I/O, you can determine whether the problem comes from the HDF5 side.

Below is a version of your example using PencilArrays. You can check (using a tool like h5diff) that the HDF5 files generated by your script and the code below are identical.

## User input.
npx = 8; npy = 16
itot = 1024; jtot = 1024; ktot = 1024

using MPI
using HDF5
using PencilArrays
using PencilArrays.PencilIO

MPI.Init()
comm = MPI.COMM_WORLD
id = MPI.Comm_rank(comm)

print("$id: has parallel HDF $(HDF5.has_parallel())\n")

topo = MPITopology(comm, (npx, npy))
decomp_dims = (1, 2)  # partition along the first two dimensions
pen = Pencil(topo, (itot, jtot, ktot), decomp_dims)

a = PencilArray{Float64}(undef, pen)
a .= id

function write_3d(driver, filename, a)
    comm = get_comm(a)
    open(driver, filename, comm; write = true) do ff
        ff["a"] = a
    end
end

function read_3d!(driver, filename, a)
    comm = get_comm(a)
    open(driver, filename, comm; read = true) do ff
        read!(ff, a, "a")
    end
end

println("Write PHDF5:")
for i = 1:3
    @time write_3d(PHDF5Driver(), "test2.h5", a)
end

println("Write MPIIO:")
for i = 1:3
    @time write_3d(MPIIODriver(), "test2.bin", a)
end

println("Read PHDF5:")
for i = 1:3
    @time read_3d!(PHDF5Driver(), "test2.h5", a)
end

println("Read MPIIO:")
for i = 1:3
    @time read_3d!(MPIIODriver(), "test2.bin", a)
end
2 Likes

For parallel I/O or data-staging at scale, I would strongly recommend ADIOS2:

There are bindings available in Julia:

Here you can see a little bit of the awesome stuff you can do with ADIOS2 (check out the short demo movies):

Moreover, we are going to create some tooling around ADIOS2.jl - an internship at CSCS this year should initiate the work, see “Distributed visualization and checkpointing in Julia” here:
https://www.cscs.ch/about/internships#

3 Likes

Reading the HDF5 using PencilArrays.jl works here, but it is also really slow (it takes about 100 sec) to write the file (similar numbers with MPI-IO). I probably have to contact the supercomputer support.

Nice library by the way. The code that I am working with (github.com/Chiil/MicroHH.jl) also uses a pencil decomposition and we use exactly the functionality your library provides. I’d be very curious to compare the performance once I have some time.

1 Like

I would be curious as well! Let me know if I can be of any help for the comparisons.

I looked into your HDF5 implementation and tried to do the same in my function. It works now, but I have the impression that the read is not collective as performance falls of very quickly for more than a few cores. How do I enable the collective read here? I am not really sure how to use the h5p_set_fapl_mio function, but I could not figure out from documentation how to do this in the proper way.

function read_3d_hdf(a)
    is = id_x*imax + 1; ie = (id_x+1)*imax
    js = id_y*jmax + 1; je = (id_y+1)*jmax

    fid = h5open("test_mpi.h5", commxy, info)
    aid = fid["a"]

    memtype = HDF5.datatype(a)
    memspace = HDF5.dataspace(a)
    asubid = HDF5.hyperslab(aid, is:ie, js:je, 1:ktot)

    # HDF5.h5p_set_fapl_mpio(aid.xfer, commxy, info)
    HDF5.h5d_read(aid, memtype.id, memspace.id, asubid, aid.xfer, a)

    close(aid)
    close(fid)
end

I think collective operations must be enabled at the moment one opens an HDF5 dataset. In my implementation I have something equivalent to this:

dapl = create_property(HDF5.H5P_DATASET_ACCESS)
dxpl = create_property(HDF5.H5P_DATASET_XFER)
dxpl[:dxpl_mpio] = HDF5.H5FD_MPIO_COLLECTIVE  # this enables collective operations

aid = open_dataset(fid, "a", dapl, dxpl)

which would replace your aid = fid["a"].

Note that in PencilArrays.jl the reading and writing functions have a collective argument, which is enabled by default. There is also a chunks argument that is false by default. In distributed filesystems it may be worth it to switch it to true, especially if all process have exactly the same amount of data (which seems to be the case in your example). In that case, each process will write to a separate contiguous chunk of the HDF5 file. I expect this to improve the writing and reading performance, but I haven’t really tried.

By the way in HDF5.jl one doesn’t need to call h5p_set_fapl_mpio to enable parallel I/O. It is done internally when one passes an MPI communicator to h5open.

This helps a lot, I was unaware of the open_dataset equivalent of create_dataset that makes enabling the collectives so easy. This is the function that gives excellent read performance now.

function read_3d_hdf(a)
    is = id_x*imax + 1; ie = (id_x+1)*imax
    js = id_y*jmax + 1; je = (id_y+1)*jmax

    fid = h5open("test_mpi.h5", commxy, info)

    dapl = create_property(HDF5.H5P_DATASET_ACCESS)
    dxpl = create_property(HDF5.H5P_DATASET_XFER)
    dxpl[:dxpl_mpio] = HDF5.H5FD_MPIO_COLLECTIVE
    aid = open_dataset(fid, "a", dapl, dxpl)

    memtype = HDF5.datatype(a)
    memspace = HDF5.dataspace(a)
    asubid = HDF5.hyperslab(aid, is:ie, js:je, 1:ktot)

    HDF5.h5d_read(aid, memtype.id, memspace.id, asubid, aid.xfer, a)

    close(aid)
    close(fid)
end

I realized now that the aid = read(fid, "a", dxpl_mpio=HDF5.H5FD_MPIO_COLLECTIVE) in my question reads the entire dataset on each processor, which explains the excessive memory use.

Is this hyperslab construction still replacable with something simpler with HDF5.jl?

1 Like

Good to know that it works!

Not that I know of. It might be worth it to add something like this to HDF5.jl.

2 Likes

I found that simply writing the indexing below, it also works well, now I load the data with open_dataset.

a[:, :, :] = aid[is:ie, js:je, :]

instead of the code below.

memtype = HDF5.datatype(a)
memspace = HDF5.dataspace(a)
asubid = HDF5.hyperslab(aid, is:ie, js:je, 1:ktot)

HDF5.h5d_read(aid, memtype.id, memspace.id, asubid, aid.xfer, a)
2 Likes