Mmapping a discontiguous file?

question

#1

I’m writing a parser for OME-TIFFs, which are pretty large binary files with many sets of XY matrices (images) stored inside of them (I have a list of their offsets within each file). Another limitation is that these files have to be <4gb so large datasets get split across files. I was hoping to load this in as a custom AbstractArray that had the files mmapped in the background. Is this possible with the current implementation of mmap? Any suggestions for how to set this up?

Ref: https://github.com/JuliaImages/Images.jl/issues/587


#2

If you can write a function to read the Mmapped chunk, and you know the distribution of the matrix parts, you can use Dagger.jl to construct something that looks like a big distributed matrix…

I made a walk through of how a Dagger array is made

using Dagger
import Dagger: ArrayDomain, Cat, Blocks, DomainBlocks,
               partition, ComputedArray, DomainSplit, Thunk

# First figure out how your data is cut up, and then you need to tell Dagger this.
# there are 2 ways to do this depending on how low level you want to get here

# method 1. Easy but only for simple cases
# Like when you have a 300x600 matrix split up into chunks of 100x200

splitup = partition(Blocks(100, 200), ArrayDomain(1:300, 1:600))
# julia> splitup = partition(Blocks(100, 200), ArrayDomain(1:300, 1:600))
# 3×3 Dagger.DomainBlocks{2}:
#  Dagger.ArrayDomain{2}((1:100,1:200))    …  Dagger.ArrayDomain{2}((1:100,401:600))  
#  Dagger.ArrayDomain{2}((101:200,1:200))     Dagger.ArrayDomain{2}((101:200,401:600))
#  Dagger.ArrayDomain{2}((201:300,1:200))     Dagger.ArrayDomain{2}((201:300,401:600))
# (I agree the printing is a bit crap)

# method 2: More involved but gives better control:
# Like when your partitioning might vary, but still only supports block partitions
splitup = DomainBlocks((1,1), (cumsum([50, 50, 100, 100]), cumsum([100, 200, 300])))

#4×3 Dagger.DomainBlocks{2}:
# Dagger.ArrayDomain{2}((1:50,1:100))     Dagger.ArrayDomain{2}((1:50,101:300))     Dagger.ArrayDomain{2}((1:50,301:600))   
# Dagger.ArrayDomain{2}((51:100,1:100))   Dagger.ArrayDomain{2}((51:100,101:300))   Dagger.ArrayDomain{2}((51:100,301:600)) 
# Dagger.ArrayDomain{2}((101:200,1:100))  Dagger.ArrayDomain{2}((101:200,101:300))  Dagger.ArrayDomain{2}((101:200,301:600))
# Dagger.ArrayDomain{2}((201:300,1:100))  Dagger.ArrayDomain{2}((201:300,101:300))  Dagger.ArrayDomain{2}((201:300,301:600))
#w you can get the index range of each piece by looking inside the splitup object:

splitup[1,1].indexes
# => (1:50,1:100)

splitup[2,3].indexes
# => (51:100,301:600)

# splitup is a special type of AbstractArray that is very fast at looking index ranges up.
# But you can call collect or map on it to work with the elements
# we will try that below to tell dagger how to read each part.

# Next you need to create a matrix of "thunks" which describe how to read each part of your data,
# In my case, to keep this example runnable I'm going to use the `rand` function to create a random matrix

thunks = map(prt -> Thunk(rand, map(length, prt.indexes)), splitup)


# Thunk takes a function and a tuple of arguments, a Thunk object represents
# a task in Dagger's... DAG

# an example of how you would do it in your case would be:
# thunks = map(files, offsets, splitup.parts) do (file, offset, prt)
#    Thunk(()) do
#        size = map(length, prt.indexes)
#        f = open(file, "r+")
#        arr = Mmap.mmap(f, Array{Float64, 2}, size)
#        close(f)
#        arr
#    end
# end

# Finally you create a concactenation of all your pieces
# (the second argument is the domain of the concatenated data)
concated = Cat(Array{Float64, 2}, ArrayDomain(1:300, 1:600), splitup, thunks)

# Gather will get all the data into your current process and concatenate it
# Don't do it if it won't fit in the RAM
gather(concated)
# => Array{Float64, 2}(300x600)

# Finally, to leverage Dagger's array abstraction and array operations,
# you need to wrap it in a ComputedArray
X = ComputedArray(concated)

# For example
gather(reducedim(+, X, 1))
# => 1×600 Array{Float64,2}:
# 158.815  150.929  ....

# You can also redistribute the data if you wish:
Distribute(Blocks(100, 100), X)

# You can also save the parts the redistribution creates,
# But I will leave that bit out in this post.

As a bonus if you do addprocs(N) before you run this, the pieces are read and computed on on all the Julia worker processes. You should only need enough memory to hold (size of a chunk) * (number of arrays in an operation) * (number of workers) amount of memory to run this.

I hope this helps! :slight_smile:


#3

It took me forever to get back to this, but I finally wrote the parser and I’m working on this problem now. Can Dagger support something where the blocks are discontiguous and spread out in multiple files? I have multiple matrices in each file over multiple files and I want to construct one lazy multi-dimensional array. See https://github.com/tlnagy/OMETIFF.jl/issues/1 and https://github.com/tlnagy/OMETIFF.jl/issues/3


#4

Dagger has a limitation at the moment that it requires the data to be cut up as a grid…

For example this is fine:

---------
|   |   |
|   |   |
---------
|   |   |
|   |   |
---------

And this is fine:

-------
|   | |
|   | |
-------
|   | |
-------

But this isn’t:

-------
|   | |
|   | |
-------
| |   |
-------

#5

Note that it doesn’t matter where the parts come from, as long as you’ve defined a function to load the parts, you can then use delayed(f) in place of f itself to create lazily read parts.

Updated the code snippet to use some changed API and style:

using Dagger
import Dagger: ArrayDomain, Blocks, DomainBlocks,
               partition, DArray, delayed

# First figure out how your data is cut up, and then you need to tell Dagger this.
# there are 2 ways to do this depending on how low level you want to get here

# method 1. Easy but only for simple cases
# Like when you have a 300x600 matrix split up into chunks of 100x200

splitup = partition(Blocks(100, 200), ArrayDomain(1:300, 1:600))
# julia> splitup = partition(Blocks(100, 200), ArrayDomain(1:300, 1:600))
# 3×3 Dagger.DomainBlocks{2}:
#  Dagger.ArrayDomain{2}((1:100,1:200))    …  Dagger.ArrayDomain{2}((1:100,401:600))  
#  Dagger.ArrayDomain{2}((101:200,1:200))     Dagger.ArrayDomain{2}((101:200,401:600))
#  Dagger.ArrayDomain{2}((201:300,1:200))     Dagger.ArrayDomain{2}((201:300,401:600))
# (I agree the printing is a bit crap)

# method 2: More involved but gives better control:
# Like when your partitioning might vary, but still only supports block partitions
splitup = DomainBlocks((1,1), (cumsum([50, 50, 100, 100]), cumsum([100, 200, 300])))

#4×3 Dagger.DomainBlocks{2}:
# Dagger.ArrayDomain{2}((1:50,1:100))     Dagger.ArrayDomain{2}((1:50,101:300))     Dagger.ArrayDomain{2}((1:50,301:600))   
# Dagger.ArrayDomain{2}((51:100,1:100))   Dagger.ArrayDomain{2}((51:100,101:300))   Dagger.ArrayDomain{2}((51:100,301:600)) 
# Dagger.ArrayDomain{2}((101:200,1:100))  Dagger.ArrayDomain{2}((101:200,101:300))  Dagger.ArrayDomain{2}((101:200,301:600))
# Dagger.ArrayDomain{2}((201:300,1:100))  Dagger.ArrayDomain{2}((201:300,101:300))  Dagger.ArrayDomain{2}((201:300,301:600))
#w you can get the index range of each piece by looking inside the splitup object:

splitup[1,1].indexes
# => (1:50,1:100)

splitup[2,3].indexes
# => (51:100,301:600)

# splitup is a special type of AbstractArray that is very fast at looking index ranges up.
# But you can call collect or map on it to work with the elements
# we will try that below to tell dagger how to read each part.

# Next you need to create a matrix of "thunks" which describe how to read each part of your data,
# In my case, to keep this example runnable I'm going to use the `rand` function to create a random matrix

thunks = map(prt -> delayed(rand)(map(length, prt.indexes)), splitup)


# Thunk takes a function and a tuple of arguments, a Thunk object represents
# a task in Dagger's... DAG

# an example of how you would do it in your case would be:
# thunks = map(files, offsets, splitup.parts) do (file, offset, prt)
#    Thunk(()) do
#        size = map(length, prt.indexes)
#        f = open(file, "r+")
#        arr = Mmap.mmap(f, Array{Float64, 2}, size)
#        close(f)
#        arr
#    end
# end

# Finally you create a concactenation of all your pieces
# (the second argument is the domain of the concatenated data)
darray = DArray(Array{Float64, 2}, ArrayDomain(1:300, 1:600), splitup, thunks)

# Collect will get all the data into your current process and concatenate it
# Don't do it if it won't fit in the RAM
collect(darray)
# => Array{Float64, 2}(300x600)

# You can do operations on the darray, for example
collect(reducedim(+, darray, 1))
# => 1×600 Array{Float64,2}:
# 158.815  150.929  ....