Hi Everyone!
JuliaCon was awesome! Thanks to everyone who was a part of it!
I’m excited to publish a project called FileTrees (repo, docs) that I have been working on with help from @jpsamaroo .
It is a thin layer of abstraction that helps solve common problems in working with multiple files of data. It also abstracts away the concerns in parallelizing such a process to run on many threads on many Julia processes.
Motivation
The recent work on threading has been fantastic, and with its long tested Distributed library, Julia has some of the best parallel computing infrastructure out there. But users still have difficulty applying this functionality to their everyday work effectively.
- These APIs are very low level. If you have 1000s of files, simply starting 1000s of tasks may try to load all the data at the same time. Besides, you may want to combine distributed processes with threading for great performance. At this point, you also need to take care of managing data locality and a slew of other concerns. This is solved by a suitable scheduler.
- We do have a scheduler in the form of Dagger.jl, but the API is still one level too low level (one task at a time).
- Many users have the immediate problem of having 100s of files and wanting to process them all in parallel.
With FileTrees you can
- Read a directory structure as a Julia data structure, (lazy-)load the files, apply map and reduce operations on the data while not exceeding available memory if possible. (docs)
- Filter data by file name using familiar Unix syntax (docs)
- Make up a file tree in memory, create some data to go with each file (in parallel), write the tree to disk (in parallel). (See example below)
- Virtually
mv
andcp
files within trees, merge and diff trees, apply different functions to different subtrees. (docs)
Design principles
- Keep it low level and simple: do not tie into a data format, which in turn calls for a slew of optimizations that have to be built into the system.
- Avoid dependencies: the only dependencies are Glob, FilePathsBase and Dagger. This is also because I want to be able to maintain it easily and converge to a finished state in terms of API. In the future more development will be done on Dagger than this package to improve it.
- Keep the API small but be powerful when features are combined.
- Be an abstraction layer over files and compute for building distributed datastructures such as distributed tables. In many ways, FileTrees contains the essence of JuliaDB.
Example
Here is an example of using FileTrees to create a 3025 images which form a big 16500x16500 image of a Mandelbrot set (I tried my best to make them all contiguous, it’s almost right, but I’m still figuring out those parameters.) Then we load them back and compute a Histogram of the HSV values across all the images in parallel using OnlineStats.jl.
Here’s the snippet to create the directory:
@everywhere using Images, FileTrees, FileIO
tree = maketree("mandel"=>[]) # an empty file tree
params = [(x, y) for x=-1:0.037:1, y=-1:0.037:1]
for i = 1:size(params,1)
for j = 1:size(params,2)
tree = touch(tree, "$i/$j.png"; value=params[i, j])
end
end
# map over the values to create an image at each node.
# 300x300 pixels per image.
t1 = FileTrees.mapvalues(tree, lazy=true) do params
# from https://rosettacode.org/wiki/Mandelbrot_set#Julia
mandelbrot(50, params..., 300) # zoom level, moveX, moveY, size
end
# save it -- blocks until everything is computed and written to disk
@time FileTrees.save(t1) do file
FileIO.save(path(file), file.value)
end
then we wait…
This takes about 150 seconds when Julia is started with 10 processes with 4 threads each, on a 12 core machine. (oversubscribing this much gives good perormance in this case.)
In other words,
export JULIA_NUM_THREADS=4
julia -p 10
Load the files back in a new session, and compute the histogram:
using Distributed
@everywhere using FileTrees, FileIO, Images, .Threads, OnlineStats, Distributed
t = FileTree("mandel")
# Lazy-load each image and compute its histogram
t1 = FileTree.load(t; lazy=true) do f
h = Hist(0:0.05:1)
img = FileIO.load(path(f))
println("pid, ", myid(), "threadid ", threadid(), ": ", path(f))
fit!(h, map(x->x.v, HSV.(img)))
end
# combine them all into one histogram using `merge` method on OnlineStats
@time h = reducevalues(merge, t1) |> exec # exec computes a lazy value
#=
When plotted looks like:
┌ ┐
0.0 ┤■■■■■■■■■■■■■■■■■■■■■■■■■■■■■ 100034205
0.05 ┤ 302199
0.1 ┤ 666776
0.15 ┤ 378473
0.2 ┤ 864297
0.25 ┤ 1053490
0.3 ┤ 602937
0.35 ┤ 667619
0.4 ┤ 1573476
0.45 ┤ 949928
0.5 ┤■ 2370727
0.55 ┤ 1518383
0.6 ┤■ 3946507
0.65 ┤■■ 6114414
0.7 ┤■ 4404784
0.75 ┤■■ 5920436
0.8 ┤■■■■■■ 20165086
0.85 ┤■■■■■■ 19384068
0.9 ┤■■■■■■■■■■■■■■■■■■■■■■ 77515666
0.95 ┤■■■■■■■ 23816529
└ ┘
=#
this takes 100 seconds.
At any point in time the whole computation holds at most 40 files in memory, because there are 40 computing elements 4 threads x 10 processes. The scheduler also takes care of freeing any memory that it knows will not be used after the result is computed.
What I would like to learn from you:
- What is required to make it work with with non-filesystems such as S3 or FTP. We use FilePathsBase, but I have not tested nor do I have experience in dealing with all kinds of URIs from which data can come in.
- Your feedback on the data structure and API.
- If you try it for a problem you have, I would like to know how it went!
I would like to thank:
- Julian Samaroo for some nice pair programming.
- Parry Husbands and Alan Edelman for giving a problem which I wanted a nice way to solve.
- Matt Bauman for his Rafts.jl document which seeded this idea.
- Everyone who makes this the best of communities to be part of.
Happy Hacking!