[ANN] FileTrees.jl -- easy everyday parallelism on trees of files

is the FileTree idea based on any prior art? It’s a really elegant solution and I am wondering if there are more I can read about.

Nothing particularly similar that I know of.

@mbauman has written something about the problems he was trying to solve: https://github.com/mbauman/Rafts.jl and outlines some of the same ideas.

There is something called “dataset” in R which creates tables whose columns come from directory structure. Working with Arrow Datasets and dplyr • Arrow R Package

Wrote some docs about how to split a file into many files. FileTrees.jl

1 Like

Thanks for very nice package/documentation/ANN post!

I’ve been thinking about parallel (including distributed) processing in Julia and I wonder if we can have a common high- and low-level interface across the ecosystem.

  • High-level API. One of my (rather “high-level”) attempt is SplittablesBase.jl that provides a very minimal interface (a halve function) between collections and parallel processing functions. Coupling this with recursively scheduled @async invoking remotecall at the bottom, I wonder if we can decouple the scheduler from the collection and making halve (or something similar but minimal) the API between them. A minimalist approach is beneficial because many data structures can be applicable to the parallel processing. This includes dictionaries, sets, strings, and many iterator transformations, etc. It also includes “virtual” data structures like data structures lazily joined.

  • Low-level API. Even though halve-based solution does not work, probably due to the locality problem as emphasized in the OP and the comments, I think it might be possible to define “low-level” API around a variant of reduce function. This lets many optimizations possible due to a tight-coupling of the reduce and the data structure. However there is still a lot of flexibility because we can do a lot on the reducing function op side as long as we allow a bit more interface for op (e.g., Transducers.jl’s start, next, complete, and Reduced).

    I think many Julia users agree that defining a common API across the ecosystem would be beneficial, because we can derive a lot on top of the common API. For example, there is no need for a separate map function because it can be derived from reduce. Now that I added a parallel syntax and extensible scheduler mechanism in FLoops.jl (I’m hoping to do a proper ANN at some point), such a collection can be reduced over using a familiar for-loop syntax as long as there is a common, powerful enough reduce-like interface. This is because FLoops.jl is “just” a way to construct the aforementioned extended “op” interface.

These are all just examples from my libraries so I hope it does not sound too much ad-like… I just wanted to mention some concrete examples of what can be done with a high-level API like halve and a low-level like “reduce” while being (I think) applicable to a framework like FileTrees.jl.

7 Likes

Thank you for your post!

Coupling this with recursively scheduled @async invoking remotecall at the bottom, I wonder if we can decouple the scheduler from the collection and making halve (or something similar but minimal) the API between them. A minimalist approach is beneficial because many data structures can be applicable to the parallel processing. This includes dictionaries, sets, strings, and many iterator transformations, etc. It also includes “virtual” data structures like data structures lazily joined.

That is a totally awesome idea!

You can hook into Dagger.jl’s scheduler, it is pretty decent, let me say a bit more about that:

You can create tasks in dagger by calling y = delayed(f)(x...) which returns a Thunk representation of f(x...). If any of the arguments x is a Thunk, then it implicitly sets up a dependency relationship between the thunks. Then compute(y::Thunk) will compute it, and any dependency using the scheduler. collect on that result will fetch the result to the caller’s process. The benefit of using something like Dagger vs @async and remotecall is that Dagger schedules it to be out-of-core friendly.

It’d be interesting to see if we can easily swap out schedulers in a package like FileTrees! I’d like to know more about your scheduler in FLoops.jl

For FileTrees, do you recommend implementing a halve method which would give a tree with approximately half of the files? It would be interesting to explore that more. I will need your help with that though!

Now that I added a parallel syntax and extensible scheduler mechanism in FLoops.jl (I’m hoping to do a proper ANN at some point)

I’ll take a closer look soon! Looking forward to the ANN!

Hi, thanks for your quick intro to Dagger.jl! I’ve been wanting to look into it. I think now I can see that how it is out-of-core friendly.

BTW, I think it was a bit of exaggeration when I said “scheduler”. It was just a mechanism to hook different implementation of reduce (sequential, threaded, distributed, unordered variant of them, etc.) into the for loop syntax. So it’s not a scheduler in the sense of e.g., partr.

Just as a fair warning(?), I don’t think halve is tested outside of my packages yet. But if you don’t mind giving a shot at this, it would be fantastic!

If you have halve + iterate (or halve + __foldl__) then it should work well with Transducers.jl and all of its related packages (e.g., ThreadsX.jl, FLoops.jl, LazyGroupBy.jl, …). I’m not sure if it provides nice out-of-core facility ATM though. I just don’t have enough experience with mixing it with a bunch of I/O (and I know there are several possible improvements for this). But if you have a bit smaller scale problem, threading based reduce could be nice to have?

2 Likes

Thanks for the reply,

Yeah, I agree this is somewhat out of scope as it has more to do with the insides of the files and perhaps more in line with the dataset stuff @c42f mentioned. I decided to bring it up just because I reckon logfiles like the ones I have are spit out by many kinds of systems and filetrees seems like a very good fit for helping to analyze it.

To add some more context to the problem, the files don’t have a header describing the contents, nor do they have any exploitable internal structure. They are just textfiles and one has to parse them line by line to find out what dataframes to create. The main use case for me is data exploration so the time it takes to parse basically far outweighs the time for any processing afterwards.

I have experiemented with just having a set of possible file_lines and initialize each logfile as something like maketree(name=filename, value=delayed(parsefile)(filename)) => dummyinits) where dummyinits is just an array of (name = "somefileline", value=delayed(nothing)). I have not yet gotten this to work as I get an array out of bounds in compute (will post an issue once I have made sure error is not on me). Even if I do get it to work it seems quite painful to dig through the Thunks and see if there is a nothing (or NoValue) in there to replace or if one shall just wrap it another Thunk when mapping, especially since the value to replace with is produced by the parent.

Another somewhat painful option I could think of is to set all the values to the same instance of some mutable lazy struct which parses the whole file the first time one tries to access some key from and blocks if one tries to access when parsing. If I come up with anything which could be useful I will make an issue about it as well.

The inserttree solution is indeed a bit overengineered as it allows for the parsing function to decide based on the contents of the file whether it needs to be a new subtree or if there is just one value (i.e a single dataframe was produced).

This looks like a great tool @shashi . Thanks for putting it up. I was playing around with the package but when i tried to run it with multiple processes, I got an error:

Without multiple processes it works fine

 ~/testtree  julia -q  --project            3.4m  Thu Aug 20 22:52:34 2020
(testtree) pkg> st
Status `~/testtree/Project.toml`
  [336ed68f] CSV v0.7.7
  [72696420] FileTrees v0.1.0
  [8ba89e20] Distributed
julia> using FileTrees
julia>

With multiple processes

 ~/testtree  julia -q  --project -p 2      18.8s  Thu Aug 20 22:52:58 2020
julia> using Distributed
julia> @everywhere using CSV
julia> @everywhere using FileTrees
ERROR: On worker 2:
ArgumentError: Package FileTrees [72696420-646e-6120-6e77-6f6420746567] is required but does not seem to be installed:
 - Run `Pkg.instantiate()` to install all recorded dependencies.

_require at ./loading.jl:999
require at ./loading.jl:928
#1 at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/Distributed.jl:78
#103 at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:290
run_work_thunk at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:79
run_work_thunk at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:88
#96 at ./task.jl:356

...and 1 more exception(s).

Stacktrace:
 [1] sync_end(::Channel{Any}) at ./task.jl:314
 [2] macro expansion at ./task.jl:333 [inlined]
 [3] _require_callback(::Base.PkgId) at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/Distributed.jl:75
 [4] #invokelatest#1 at ./essentials.jl:710 [inlined]
 [5] invokelatest at ./essentials.jl:709 [inlined]
 [6] require(::Base.PkgId) at ./loading.jl:931
 [7] require(::Module, ::Symbol) at ./loading.jl:923
 [8] top-level scope at /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:200

I have run Pkg.instantiate(), but it doesn’t fix the issue. I realize that this may not be caused by FileTrees, but I can run @everywhere using CSV successfully. To the extent that I understand the error message, there seems to be some weird referenceses to files which are definitely not located on my machine, e.g.
t /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:

Is this a issue with FileTrees?

Thanks for your feedback, issues and PRs!

I just saw your experiments https://github.com/shashi/FileTrees.jl/issues/25 that’s a good solution, although right now seems a bit forceful, I’m interested in making this experience better and more natural.

I wonder if one can create metadata pretty quickly for your files with awk (or readlines()) and then lazy-load each slice lazily using grep on the line prefixes first got from awk…

That’s really weird, I have no idea what’s going on off the top of my head.

Is there a FileTrees entry in your Manifest.toml file? What does it look like?

Just as a fair warning(?), I don’t think halve is tested outside of my packages yet. But if you don’t mind giving a shot at this, it would be fantastic!

I think first we should make Dagger a consumer of this API. It’d be amazing to try distributed LazyGroupBy on a bunch of files. I think that’s a solid example to try and implement.

cc @piever

1 Like

Yes, out-of-core group-by sounds like a nice application. I think it shouldn’t be too hard to implement reduce with Dagger API. I might try this myself at some point.

1 Like

Hehe, “forceful” is really the right word to decribe it :slight_smile:

Skimming through the files first might be an option as well although it might end up being to slow. I was also a bit worried that reading the same file from multiple workers would make them block each other, but maybe it doesn’t work that way?

1 Like

I had some issues to get distributed working as well. Is there a chance that CSV works because it is in your shared/default environment?

I could get it to work using the method posted here: Packages and workers - #10 by ksmcreynolds

I think that the flags you used should be equivalent but I don’t know enough about it to say for sure.

2 Likes

Thanks @DrChainsaw! Your’e right.

1 Like

Thanks for the answer @shashi. The problem was that I needed to explicitly activate the environment for all workers. Now my script runs, but I still don’t see a parallelization.

Consider the following script, test.jl

@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using Distributed, FileTrees, .Threads

@show nthreads()
@show nprocs()

@everywhere function create_tree()
    t = maketree("test_file_tree" => [])
    for  c in 'A':'Z'
        node_file =  joinpath(string(c), "nodefile")
        t = touch(
            t, 
            node_file,
            value=1
        )
    end
    t
end

t = create_tree()
FileTrees.save(t) do file
    println("pid : $(myid()), threadid : $(threadid()), $(path(file))")
end |> exec

Running julia 1.5, FileTrees 0.1.2, with JULIA_NUM_THREADS=4, I get

> julia -p 2  test.jl
 Activating environment at `~testtree/Project.toml`
      From worker 3:	 Activating environment at `~/testtree/Project.toml`
      From worker 2:	 Activating environment at `~/testtree/Project.toml`
nthreads() = 4
nprocs() = 3
pid : 1, threadid : 1, test_file_tree/A/nodefile
pid : 1, threadid : 1, test_file_tree/B/nodefile
pid : 1, threadid : 1, test_file_tree/C/nodefile
pid : 1, threadid : 1, test_file_tree/D/nodefile
pid : 1, threadid : 1, test_file_tree/E/nodefile
pid : 1, threadid : 1, test_file_tree/F/nodefile
pid : 1, threadid : 1, test_file_tree/G/nodefile
pid : 1, threadid : 1, test_file_tree/H/nodefile
pid : 1, threadid : 1, test_file_tree/I/nodefile
pid : 1, threadid : 1, test_file_tree/J/nodefile
pid : 1, threadid : 1, test_file_tree/K/nodefile
pid : 1, threadid : 1, test_file_tree/L/nodefile
pid : 1, threadid : 1, test_file_tree/M/nodefile
pid : 1, threadid : 1, test_file_tree/N/nodefile
pid : 1, threadid : 1, test_file_tree/O/nodefile
pid : 1, threadid : 1, test_file_tree/P/nodefile
pid : 1, threadid : 1, test_file_tree/Q/nodefile
pid : 1, threadid : 1, test_file_tree/R/nodefile
pid : 1, threadid : 1, test_file_tree/S/nodefile
pid : 1, threadid : 1, test_file_tree/T/nodefile
pid : 1, threadid : 1, test_file_tree/U/nodefile
pid : 1, threadid : 1, test_file_tree/V/nodefile
pid : 1, threadid : 1, test_file_tree/W/nodefile
pid : 1, threadid : 1, test_file_tree/X/nodefile
pid : 1, threadid : 1, test_file_tree/Y/nodefile
pid : 1, threadid : 1, test_file_tree/Z/nodefile

Moreover, if I comment out the value=1 line in the tree constructor, nothing is printed out. It the save loop seems to skip nodes without values. Is that intentional?

If these are issues with FileTrees I can create issues there.

1 Like

Ah. I used save and not load with lazy=true. Works now. Sorry for the noise.

1 Like

Forgive me for saying this. FileTrees looks great. It assumes a POSIX compatible filesystem (I think). Systems admins like me always howl when people read and write many small files - I suppose we should really suck it up and start to engineer for it.

Is there any though it Julia for using S3 object stores?
I guess that would be a completely separate module from FileTrees.jl

1 Like

Thank you, @shashi. This is a great and useful tool.

I write a not very small example, because I have a doubt.
I have been working using it with some ML preprocess in which files has a certain structure “test//”

    # All files
    all = FileTree(dataset_dir)

    # Apply the function apply to each file
    data = FileTrees.load(all; lazy=true) do file
        # Get name
        fname_str = convert(String, path(file))
        # Apply the model to the image
        apply(fname_str)
    end

    # Recover the categories
    categories = FileTrees.load(all; lazy=true) do file
        # Get name
        category = path(file).segments[end-1]
        category
    end

    for type in ["test", "train"]
        # I love that part, that you can so easily filter train and test files
        sel = GlobMatch("$type/*/*.$ext")
        values = reducevalues(hcat, data[sel]) |> exec
        cats = reducevalues(hcat, categories[sel]) |> exec
        writedlm(..., values)
        writedlm(..., cats)
    end

It is right? For me it is working well, but I do not know if the sort
of categories and data are right even in parallel way. It could be run in parallel without problem.

1 Like

@tkf nice! I’d be down to pair program on that, it might be much quicker to do it together than our own.

@johnh
I’m looking for users on all platforms! If you give it a go, let me know.

Right now we actually don’t tie into POSIX, by means of FilePathsBase.jl … I think the idea is depending on which platform you’re on, the path will be of a different type. But since I don’t have a good use case to test it on, prefixes · Issue #8 · shashi/FileTrees.jl · GitHub

I think at least we would need to change how path works. https://github.com/shashi/FileTrees.jl/blob/master/src/datastructure.jl#L288

possibly path(parent(d)) / Path(d.name) should become path(parent(d)) / d.name and we store the right path type in the root node as stated in issue #8.

I’m actually not sure if there’s an AbstractPath implementation for S3 that is open source and based on FilePaths.jl cc @RoyiAvital (sorry for pinging :slight_smile: )

@jonalm Thanks! That makes sense! We should definitely document that.

skip nodes without values. Is that intentional?

That’s right! But should I’m open to doing the “right thing”. Please do open an issue where we can discuss.

@dmolina that looks good!

You can also try name(parent(file)) to get the category instead of reaching into the path segments…

I think lazy in the second case of getting the categories is overkill, I’d just skip that. (because you don’t need to run that in parallel).