[ANN] FileTrees.jl -- easy everyday parallelism on trees of files

Hi Everyone!

JuliaCon was awesome! Thanks to everyone who was a part of it!

I’m excited to publish a project called FileTrees (repo, docs) that I have been working on with help from @jpsamaroo .

It is a thin layer of abstraction that helps solve common problems in working with multiple files of data. It also abstracts away the concerns in parallelizing such a process to run on many threads on many Julia processes.

Motivation

The recent work on threading has been fantastic, and with its long tested Distributed library, Julia has some of the best parallel computing infrastructure out there. But users still have difficulty applying this functionality to their everyday work effectively.

  • These APIs are very low level. If you have 1000s of files, simply starting 1000s of tasks may try to load all the data at the same time. Besides, you may want to combine distributed processes with threading for great performance. At this point, you also need to take care of managing data locality and a slew of other concerns. This is solved by a suitable scheduler.
  • We do have a scheduler in the form of Dagger.jl, but the API is still one level too low level (one task at a time).
  • Many users have the immediate problem of having 100s of files and wanting to process them all in parallel.

With FileTrees you can

  • Read a directory structure as a Julia data structure, (lazy-)load the files, apply map and reduce operations on the data while not exceeding available memory if possible. (docs)
  • Filter data by file name using familiar Unix syntax (docs)
  • Make up a file tree in memory, create some data to go with each file (in parallel), write the tree to disk (in parallel). (See example below)
  • Virtually mv and cp files within trees, merge and diff trees, apply different functions to different subtrees. (docs)

Design principles

  • Keep it low level and simple: do not tie into a data format, which in turn calls for a slew of optimizations that have to be built into the system.
  • Avoid dependencies: the only dependencies are Glob, FilePathsBase and Dagger. This is also because I want to be able to maintain it easily and converge to a finished state in terms of API. In the future more development will be done on Dagger than this package to improve it.
  • Keep the API small but be powerful when features are combined.
  • Be an abstraction layer over files and compute for building distributed datastructures such as distributed tables. In many ways, FileTrees contains the essence of JuliaDB.

Example

Here is an example of using FileTrees to create a 3025 images which form a big 16500x16500 image of a Mandelbrot set (I tried my best to make them all contiguous, it’s almost right, but I’m still figuring out those parameters.) Then we load them back and compute a Histogram of the HSV values across all the images in parallel using OnlineStats.jl.

Here’s the snippet to create the directory:

@everywhere using Images, FileTrees, FileIO

tree = maketree("mandel"=>[]) # an empty file tree
params = [(x, y) for x=-1:0.037:1, y=-1:0.037:1]
for i = 1:size(params,1)
    for j = 1:size(params,2)
        tree = touch(tree, "$i/$j.png"; value=params[i, j])
    end
end

# map over the values to create an image at each node.
# 300x300 pixels per image.
t1 = FileTrees.mapvalues(tree, lazy=true) do params
     # from https://rosettacode.org/wiki/Mandelbrot_set#Julia
    mandelbrot(50, params..., 300) # zoom level, moveX, moveY, size
end

# save it -- blocks until everything is computed and written to disk
@time FileTrees.save(t1) do file
    FileIO.save(path(file), file.value)
end

then we wait…

This takes about 150 seconds when Julia is started with 10 processes with 4 threads each, on a 12 core machine. (oversubscribing this much gives good perormance in this case.)

In other words,

export JULIA_NUM_THREADS=4
julia -p 10

Load the files back in a new session, and compute the histogram:

using Distributed
@everywhere using FileTrees, FileIO, Images, .Threads, OnlineStats, Distributed

t = FileTree("mandel")

# Lazy-load each image and compute its histogram
t1 = FileTree.load(t; lazy=true) do f
    h = Hist(0:0.05:1)
    img = FileIO.load(path(f))
    println("pid, ", myid(), "threadid ", threadid(), ": ", path(f))
    fit!(h, map(x->x.v, HSV.(img)))
end

# combine them all into one histogram using `merge` method on OnlineStats

@time h = reducevalues(merge, t1) |> exec # exec computes a lazy value

#=
When plotted looks like:
        ┌                                        ┐ 
    0.0 ┤■■■■■■■■■■■■■■■■■■■■■■■■■■■■■ 100034205   
   0.05 ┤ 302199                                   
    0.1 ┤ 666776                                   
   0.15 ┤ 378473                                   
    0.2 ┤ 864297                                   
   0.25 ┤ 1053490                                  
    0.3 ┤ 602937                                   
   0.35 ┤ 667619                                   
    0.4 ┤ 1573476                                  
   0.45 ┤ 949928                                   
    0.5 ┤■ 2370727                                 
   0.55 ┤ 1518383                                  
    0.6 ┤■ 3946507                                 
   0.65 ┤■■ 6114414                                
    0.7 ┤■ 4404784                                 
   0.75 ┤■■ 5920436                                
    0.8 ┤■■■■■■ 20165086                           
   0.85 ┤■■■■■■ 19384068                           
    0.9 ┤■■■■■■■■■■■■■■■■■■■■■■ 77515666           
   0.95 ┤■■■■■■■ 23816529                          
        └                                        ┘ 
=#

this takes 100 seconds.

At any point in time the whole computation holds at most 40 files in memory, because there are 40 computing elements 4 threads x 10 processes. The scheduler also takes care of freeing any memory that it knows will not be used after the result is computed.

What I would like to learn from you:

  • What is required to make it work with with non-filesystems such as S3 or FTP. We use FilePathsBase, but I have not tested nor do I have experience in dealing with all kinds of URIs from which data can come in.
  • Your feedback on the data structure and API.
  • If you try it for a problem you have, I would like to know how it went! :slight_smile:

I would like to thank:

  • Julian Samaroo for some nice pair programming.
  • Parry Husbands and Alan Edelman for giving a problem which I wanted a nice way to solve.
  • Matt Bauman for his Rafts.jl document which seeded this idea.
  • Everyone who makes this the best of communities to be part of.

Happy Hacking!

47 Likes

Thank you @shashi for this awesome package. I wonder if you could contribute your experience with the Distributed api in this thread: The ultimate guide to distributed computing

There I am trying to understand the current model of parallelization on distributed-memory clusters and the limitations of this simple pmap approach. Could you please highlight some of the issues in advance?

2 Likes

is it gonna be used in JuiaDB2?

Replied there! Thanks!

Thank you @shashi, I work with Machine Learning and it seems a very useful and powerful way to load images for learning/training. Also, the documentation is great!

1 Like

Very cool! Reading the (excellent) documentation and https://github.com/shashi/FileTrees.jl/issues/9 (@Chris_Foster), I’m thinking a bit of the roles of the various packages to form a nice distributed array ecosystem, and relating it to for instance https://dask.org/.

I’m interested in a related use case, namely large chunked array datasets, commonly used in fields like climate science. In dask there is dask.array. There are different packages that offer IO for these chunked datasets, e.g. Zarr.jl, HDF5.jl, NCDatasets.jl, ArchGDAL.jl. In the case of Zarr.jl these chunks can each be separate files, but often the chunks are internal to the files. To help reason about these chunks within AbstractArrays @fabiangans created DiskArrays.jl, see [ANN] DiskArrays.jl, and we’ve been busy integrating this into above mentioned packages.

It would be cool to be able to easily put these data sources into a form where we can use Dagger.jl to apply scheduled operations over chunks, and also keep track of the positions of chunks in the dataset, for operations that go over chunk boundaries. I’m aware there is Dagger.DArray but not sure of it’s place or future, it is a bit underdocumented. Is Dagger.jl going towards primarily being a scheduler?

3 Likes

Awesome :slight_smile: I’ll definitely be using this for work !

(Small nitpicking for the website (snazzy!), you have a cool “go to repo” overlay with a “powered by Julia” logo below it, on mobile it doesn’t show well (ends up being white on white))

1 Like

Hi @visr thanks for the comment!

I’m thinking a bit of the roles of the various packages to form a nice distributed array ecosystem, and relating it to for instance https://dask.org/.

Dagger is pretty similar to dask. I based the code on Dask’s multi-process scheduler in the beginning.

Yes, Dagger.DArray exists, but operations on it are a still a work in progress. I have been meaning to move it out into a new package, and possibly base the implementation on @mcabbott’s Tullio.jl, but the basic DArray structure could remain the same.

When the array infrastructure stabilizes it will be nice to allow creating distributed arrays from FileTrees composed with DiskArrays: so a single file can have many chunks in the distributed array.

@Chris_Foster may have thoughts about this kind of thing!

2 Likes

Hi @visr, great use case. As you might imagine, large rasters are on my mind due to my history of having to deal with Geospatial data :wink:

I see you’ve found my reference to DataSets.jl. I’ll announce that package properly in the coming weeks — once it’s solidified a bit more and is at least somewhat demo-able — but I’m thinking about loading large structured datasets in general. Trees are an important case (and FileTrees.jl is cool and intuitive!), but I also think that not all large data is naturally tree-like.

In thinking about lazy loading of large datasets I think it can be helpful to identify:

  • The keys: for array-like raster data, this is generally a Cartesian range, as might be returned by keys(), addressing the pixel data via raster coordinates.
  • The index (or indices): Given a key, this provides a lookup mechanism. In Zarr this would be the metadata and chunking mechanism.
  • The index may be implemented as a natural partitioning of the data relative to the key space. A partition is generally chosen for efficiency under some assumptions about how a computation will traverse the key space. For geospatial raster data, the partitioning would generally be tiles.

Out of core or distributed processing must be aware of the data partition to get good performance.

But from the point of view of the user, it’s natural to think of a large raster as basically an AbstractArray and to work in terms of the keys.

So I think there’s a disconnect here between the most abstract logical view of the data in terms of the key space (AbstractArray) vs the need to respect the partitioning of the data for efficiency.

I won’t claim to know what to do about this yet! I’m still trying to figure out how to state the problem clearly :slight_smile:

2 Likes

On the topic of ”If you try it for a problem you have, I would like to know how it went!” with a slight mix of “this might be common enough to be worth catering to but I can’t think of an elegant solution”.

Overall it was very useable out of the box and it certainly made my life a little bit more endurable.

My “FileTree” does not have nice CSVs in it but rather textlogs with a mix of different formats which are basically variants of file;line, par1=val1, par2=val2,…. To me the intuitive thing was to parse each file into one DataFrame per type of line encountered.

The first version just put the value of each File once processed as a dict with keys being file_line and the corresponding DataFrame as value, but this was a bit awkward to work with given that the DataFrames are what one typically wants to processes and not the whole dict. That could however be hackedaround by the following somewhat dishonest construct:

struct InsertTree{F}
    tree::FileTree
    mapname::F
end
inserttree(t, mapname=s -> first(split(s, '.'))) = InsertTree(t, mapname)
FileTrees.File(parent::Union{Nothing,FileTree}, name::String, value::InsertTree) = FileTree(value.tree;parent=parent, name=value.mapname(name))

Which allows for returning the result of the parsing as a FileTree and then mapvalues through insterttree so that the file_lines become the new leaves in the three. Unless this is a completely insane thing to want to do then perhaps some less sneaky (and more robust) way to do it could be warranted.

The limit of this approach though is that lazy mappings doesn’t work as the leaves are not there when the pattern matching is performed. I guess for a fully generic solution one would also need the pattern matching to be lazy and this seems like a hopeless mess at face value (at least for regexps).

Some more minor things:

I ended up using map with dirs=false a lot more than mapvalues as my directory structure contained a lot of the context. Nothing wrong with that I guess, but it does not seem to exist a function to access the value of a File and that makes it seem like it is not the way one is supposed to do things.

Similarly it is a bit awkward that reducevalues seem to be the only way to get stuff out of the tree. Perhaps an iterator over the nodes/values of a tree could be useful? Or did I miss it?

2 Likes

On the topic of ”If you try it for a problem you have, I would like to know how it went!” with a slight mix of “this might be common enough to be worth catering to but I can’t think of an elegant solution”.

Haha thank you so much for this note!

Unless this is a completely insane thing to want to do then perhaps some less sneaky (and more robust) way to do it could be warranted.

No totally not, although it’s immediately not what the package is trying to do, it’s possible to do. I would have tried using mapsubtrees:

mapsubtrees(tree, r"*.log") do file
  #... Here you can return a FileTree, and it will become part of the output tree
  #... at the same spot that `file` originally was. 
end

The limit of this approach though is that lazy mappings doesn’t work as the leaves are not there when the pattern matching is performed.

True! However, you can definitely construct the tree by reading only the “header” from each file, and then returning a lazy subtree within mapsubtrees where each node knows how to read its own sub-part of the file. I think that’s not documented well enough. Basically delayed(f)(x...) will create a lazy value which represents the result of running f(x...). I was hoping this is something we don’t have to document, but on second thought, it seems liberal use of delayed has lesser number of caveats than I thought it would.

but it does not seem to exist a function to access the value of a File and that makes it seem like it is not the way one is supposed to do things.

Ah, the syntax file[] will access the value for that file. Hmm, I wonder what we can do to make that clearer.

Similarly it is a bit awkward that reducevalues seem to be the only way to get stuff out of the tree. Perhaps an iterator over the nodes/values of a tree could be useful? Or did I miss it?

You’re right about that. We should add values, files, nodes and dirs functions which do this. Would that be good enough?

I’m glad you found it useful! I will open a couple of issues to think about these. Feel free to open some yourself!

Thank you!

1 Like

is the FileTree idea based on any prior art? It’s a really elegant solution and I am wondering if there are more I can read about.

Nothing particularly similar that I know of.

@mbauman has written something about the problems he was trying to solve: https://github.com/mbauman/Rafts.jl and outlines some of the same ideas.

There is something called “dataset” in R which creates tables whose columns come from directory structure. https://arrow.apache.org/docs/r/articles/dataset.html

Wrote some docs about how to split a file into many files. http://shashi.biz/FileTrees.jl/split-files/

1 Like

Thanks for very nice package/documentation/ANN post!

I’ve been thinking about parallel (including distributed) processing in Julia and I wonder if we can have a common high- and low-level interface across the ecosystem.

  • High-level API. One of my (rather “high-level”) attempt is SplittablesBase.jl that provides a very minimal interface (a halve function) between collections and parallel processing functions. Coupling this with recursively scheduled @async invoking remotecall at the bottom, I wonder if we can decouple the scheduler from the collection and making halve (or something similar but minimal) the API between them. A minimalist approach is beneficial because many data structures can be applicable to the parallel processing. This includes dictionaries, sets, strings, and many iterator transformations, etc. It also includes “virtual” data structures like data structures lazily joined.

  • Low-level API. Even though halve-based solution does not work, probably due to the locality problem as emphasized in the OP and the comments, I think it might be possible to define “low-level” API around a variant of reduce function. This lets many optimizations possible due to a tight-coupling of the reduce and the data structure. However there is still a lot of flexibility because we can do a lot on the reducing function op side as long as we allow a bit more interface for op (e.g., Transducers.jl’s start, next, complete, and Reduced).

    I think many Julia users agree that defining a common API across the ecosystem would be beneficial, because we can derive a lot on top of the common API. For example, there is no need for a separate map function because it can be derived from reduce. Now that I added a parallel syntax and extensible scheduler mechanism in FLoops.jl (I’m hoping to do a proper ANN at some point), such a collection can be reduced over using a familiar for-loop syntax as long as there is a common, powerful enough reduce-like interface. This is because FLoops.jl is “just” a way to construct the aforementioned extended “op” interface.

These are all just examples from my libraries so I hope it does not sound too much ad-like… I just wanted to mention some concrete examples of what can be done with a high-level API like halve and a low-level like “reduce” while being (I think) applicable to a framework like FileTrees.jl.

7 Likes

Thank you for your post!

Coupling this with recursively scheduled @async invoking remotecall at the bottom, I wonder if we can decouple the scheduler from the collection and making halve (or something similar but minimal) the API between them. A minimalist approach is beneficial because many data structures can be applicable to the parallel processing. This includes dictionaries, sets, strings, and many iterator transformations, etc. It also includes “virtual” data structures like data structures lazily joined.

That is a totally awesome idea!

You can hook into Dagger.jl’s scheduler, it is pretty decent, let me say a bit more about that:

You can create tasks in dagger by calling y = delayed(f)(x...) which returns a Thunk representation of f(x...). If any of the arguments x is a Thunk, then it implicitly sets up a dependency relationship between the thunks. Then compute(y::Thunk) will compute it, and any dependency using the scheduler. collect on that result will fetch the result to the caller’s process. The benefit of using something like Dagger vs @async and remotecall is that Dagger schedules it to be out-of-core friendly.

It’d be interesting to see if we can easily swap out schedulers in a package like FileTrees! I’d like to know more about your scheduler in FLoops.jl

For FileTrees, do you recommend implementing a halve method which would give a tree with approximately half of the files? It would be interesting to explore that more. I will need your help with that though!

Now that I added a parallel syntax and extensible scheduler mechanism in FLoops.jl (I’m hoping to do a proper ANN at some point)

I’ll take a closer look soon! Looking forward to the ANN!

Hi, thanks for your quick intro to Dagger.jl! I’ve been wanting to look into it. I think now I can see that how it is out-of-core friendly.

BTW, I think it was a bit of exaggeration when I said “scheduler”. It was just a mechanism to hook different implementation of reduce (sequential, threaded, distributed, unordered variant of them, etc.) into the for loop syntax. So it’s not a scheduler in the sense of e.g., partr.

Just as a fair warning(?), I don’t think halve is tested outside of my packages yet. But if you don’t mind giving a shot at this, it would be fantastic!

If you have halve + iterate (or halve + __foldl__) then it should work well with Transducers.jl and all of its related packages (e.g., ThreadsX.jl, FLoops.jl, LazyGroupBy.jl, …). I’m not sure if it provides nice out-of-core facility ATM though. I just don’t have enough experience with mixing it with a bunch of I/O (and I know there are several possible improvements for this). But if you have a bit smaller scale problem, threading based reduce could be nice to have?

2 Likes

Thanks for the reply,

Yeah, I agree this is somewhat out of scope as it has more to do with the insides of the files and perhaps more in line with the dataset stuff @Chris_Foster mentioned. I decided to bring it up just because I reckon logfiles like the ones I have are spit out by many kinds of systems and filetrees seems like a very good fit for helping to analyze it.

To add some more context to the problem, the files don’t have a header describing the contents, nor do they have any exploitable internal structure. They are just textfiles and one has to parse them line by line to find out what dataframes to create. The main use case for me is data exploration so the time it takes to parse basically far outweighs the time for any processing afterwards.

I have experiemented with just having a set of possible file_lines and initialize each logfile as something like maketree(name=filename, value=delayed(parsefile)(filename)) => dummyinits) where dummyinits is just an array of (name = "somefileline", value=delayed(nothing)). I have not yet gotten this to work as I get an array out of bounds in compute (will post an issue once I have made sure error is not on me). Even if I do get it to work it seems quite painful to dig through the Thunks and see if there is a nothing (or NoValue) in there to replace or if one shall just wrap it another Thunk when mapping, especially since the value to replace with is produced by the parent.

Another somewhat painful option I could think of is to set all the values to the same instance of some mutable lazy struct which parses the whole file the first time one tries to access some key from and blocks if one tries to access when parsing. If I come up with anything which could be useful I will make an issue about it as well.

The inserttree solution is indeed a bit overengineered as it allows for the parsing function to decide based on the contents of the file whether it needs to be a new subtree or if there is just one value (i.e a single dataframe was produced).

This looks like a great tool @shashi . Thanks for putting it up. I was playing around with the package but when i tried to run it with multiple processes, I got an error:

Without multiple processes it works fine

 ~/testtree  julia -q  --project            3.4m  Thu Aug 20 22:52:34 2020
(testtree) pkg> st
Status `~/testtree/Project.toml`
  [336ed68f] CSV v0.7.7
  [72696420] FileTrees v0.1.0
  [8ba89e20] Distributed
julia> using FileTrees
julia>

With multiple processes

 ~/testtree  julia -q  --project -p 2      18.8s  Thu Aug 20 22:52:58 2020
julia> using Distributed
julia> @everywhere using CSV
julia> @everywhere using FileTrees
ERROR: On worker 2:
ArgumentError: Package FileTrees [72696420-646e-6120-6e77-6f6420746567] is required but does not seem to be installed:
 - Run `Pkg.instantiate()` to install all recorded dependencies.

_require at ./loading.jl:999
require at ./loading.jl:928
#1 at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/Distributed.jl:78
#103 at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:290
run_work_thunk at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:79
run_work_thunk at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:88
#96 at ./task.jl:356

...and 1 more exception(s).

Stacktrace:
 [1] sync_end(::Channel{Any}) at ./task.jl:314
 [2] macro expansion at ./task.jl:333 [inlined]
 [3] _require_callback(::Base.PkgId) at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/Distributed.jl:75
 [4] #invokelatest#1 at ./essentials.jl:710 [inlined]
 [5] invokelatest at ./essentials.jl:709 [inlined]
 [6] require(::Base.PkgId) at ./loading.jl:931
 [7] require(::Module, ::Symbol) at ./loading.jl:923
 [8] top-level scope at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:200

I have run Pkg.instantiate(), but it doesn’t fix the issue. I realize that this may not be caused by FileTrees, but I can run @everywhere using CSV successfully. To the extent that I understand the error message, there seems to be some weird referenceses to files which are definitely not located on my machine, e.g.
t /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:

Is this a issue with FileTrees?

Thanks for your feedback, issues and PRs!

I just saw your experiments https://github.com/shashi/FileTrees.jl/issues/25 that’s a good solution, although right now seems a bit forceful, I’m interested in making this experience better and more natural.

I wonder if one can create metadata pretty quickly for your files with awk (or readlines()) and then lazy-load each slice lazily using grep on the line prefixes first got from awk…