[ANN] DataFrameDBs.jl

Hi all!
Julia is my hobby and I had some free time for the last 4 weeks, so here is the first results of my experiments https://github.com/waralex/DataFrameDBs.jl

It is the prototype of columnar, persistent, type stable and space efficient database in pure Julia.
Some examples on this dataset imported to DataFrameDBs

julia> using DataFrameDBs
julia> t = open_table("ecommerce")
DFTable path: ecommerce
10×6 DataFrames.DataFrame
│ Row │ column        │ type           │ rows         │ uncompressed size │ compressed size │ compression ratio │
│     │ Symbol        │ String         │ String       │ String            │ String          │ Float64           │
├─────┼───────────────┼────────────────┼──────────────┼───────────────────┼─────────────────┼───────────────────┤
│ 1   │ event_time    │ Dates.DateTime │ 109.95 MRows │ 838.86 MB         │ 43.81 MB        │ 19.15             │
│ 2   │ event_type    │ String         │ 109.95 MRows │ 845.2 MB          │ 43.02 MB        │ 19.65             │
│ 3   │ product_id    │ Int64          │ 109.95 MRows │ 838.86 MB         │ 403.31 MB       │ 2.08              │
│ 4   │ category_id   │ Int64          │ 109.95 MRows │ 838.86 MB         │ 298.06 MB       │ 2.81              │
│ 5   │ category_code │ String         │ 109.95 MRows │ 1.97 GB           │ 467.32 MB       │ 4.31              │
│ 6   │ brand         │ String         │ 109.95 MRows │ 956.34 MB         │ 418.3 MB        │ 2.29              │
│ 7   │ price         │ Float64        │ 109.95 MRows │ 838.86 MB         │ 475.22 MB       │ 1.77              │
│ 8   │ user_id       │ Int64          │ 109.95 MRows │ 838.86 MB         │ 424.92 MB       │ 1.97              │
│ 9   │ user_session  │ String         │ 109.95 MRows │ 4.1 GB            │ 2.79 GB         │ 1.47              │
│ 10  │ Table total   │                │ 109.95 MRows │ 11.92 GB          │ 5.3 GB          │ 2.25              │

This data set occupies 5.3 GB of disk space, the source CSV files occupy 14 GB

Let’s do some actions with it:

julia> turnon_progress!(t) #turn on displaing progress for all reads from disc

julia> view = t[1:100:end, :]  #It's a lazy view
View of table ecommerce
Projection: event_time=>col(event_time)::Dates.DateTime; event_type=>col(event_type)::String; ...
Selection: 1:100:10995070

julia> view2 = view[350 .> view.price .> 300, [:brand, :price, :event_type]] #It's a lazy view too
View of table ecommerce
Projection: brand=>col(brand)::String; price=>col(price)::Float64; event_type=>col(event_type)::String
Selection: 1:100:109950701 |> &(>(350, col(price)::Float64)::Bool, >(col(price)::Float64, 300)::Bool)::Bool

julia> size(view2) #Reads only the :price column by blocks of ~ 65000 rows and calculates the number of elements according to the condition
Time: 0:00:00 readed: 109.95 MRows (169.65 MRows/sec)
(43239, 3)

julia> materialize(view2) #Materialize to the dataframe only those rows that satisfy the condition
Time: 0:00:01 readed: 109.95 MRows (64.2 MRows/sec)
43239×3 DataFrames.DataFrame
│ Row   │ brand     │ price   │ event_type │
│       │ String    │ Float64 │ String     │
├───────┼───────────┼─────────┼────────────┤
│ 1     │           │ 321.73  │ view       │
│ 2     │ xiaomi    │ 348.53  │ view       │
│ 3     │ sv        │ 308.63  │ view       │
│ 4     │ xiaomi    │ 348.53  │ view       │
│ 5     │ intel     │ 348.79  │ view       │
│ 6     │ irobot    │ 319.45  │ view       │
....

julia> brand = view2.brand #Lazy iteratable column of view2
DFColumn{String}

julia> unique(brand) #find unique brands by condition of view2
Time: 0:00:01 readed: 109.95 MRows (81.66 MRows/sec)
542-element Array{Any,1}:
 ""
 "xiaomi"
 "sv"
 "intel"
 "irobot"
 "pioneer"
 "dauscher"
...
julia> length.(brand) #Lazy column of lengths of brands
DFColumn{Int64}

julia> sum(length.(brand)) #sum of lengths of brands
Time: 0:00:01 readed: 109.95 MRows (86.65 MRows/sec)
204850

Physically, a table is a directory, each column is stored in a separate file, divided into blocks of 65536 elements. Each block is compressed using lz4. When iterating over a view or a column, one block is read from the columns required to check the selection conditions. After that, if this block has the necessary rows, the block is read from the columns required for constructing the projection and the broadcasts (if any) are performed on this block. So only the data necessary for a given request is read into memory.

If this package is interesting for the Community, then it can be expanded to support aggregates, joins, additional stored types and etc.

Best regards, Alexandr

40 Likes

Absolutely love this idea!

Very nice! Bookmarked! I wonder if there are opportunities to collaborate, as I make JDF.jl which is a columnar DataFrame storage format utilizing compression (Blosc mostly via Blosc.jl).

5 Likes

Looks very interesting! Filesystem hiarchy reminds of splayed tables from kdb+. Is performance competitive with kdb+?

1 Like

I don’t work with kdb+, DataFrameDBs is primarily inspired by ClickHouse (and my own experience on developing columnar DB with c++). Performance of DataFrameDBs competes with ClickHouse when ClickHouse runs on a single thread.

3 Likes

could you provide a benchmark to support this claim?

I’m sorry , I made a mistake. I took my dreams as reality :frowning: . When testing on the same dataset, it turned out that DataFrameDBs are about 2-3 times slower than ClickHouse:

war-develop :) select count() from ecomm where price > 100;

SELECT count()
FROM ecomm
WHERE price > 100

┌──count()─┐
│ 72300800 │
└──────────┘

1 rows in set. Elapsed: 0.628 sec. Processed 109.95 million rows, 879.61 MB (174.96 million rows/s., 1.40 GB/s.)
julia> @time size(t[t.price .> 100, :])
Time: 0:00:00 readed: 109.95 MRows (514.52 MRows/sec)
Time: 0:00:01 readed: 109.95 MRows (92.93 MRows/sec)
  1.399899 seconds (64.04 k allocations: 1.911 GiB, 2.57% gc time)
(72300800, 9)
SELECT count()
FROM ecomm
WHERE brand = 'apple'

┌──count()─┐
│ 10381933 │
└──────────┘

1 rows in set. Elapsed: 1.712 sec. Processed 109.95 million rows, 1.55 GB (64.22 million rows/s., 906.79 MB/s.)
julia> @time size(t[t.brand .== "apple", :])
Time: 0:00:00 readed: 109.95 MRows (506.56 MRows/sec)
Time: 0:00:04 readed: 109.95 MRows (24.29 MRows/sec)
  4.748265 seconds (94.69 M allocations: 3.810 GiB, 9.65% gc time)
(10381933, 9)
SELECT count()
FROM ecomm
WHERE (product_id % 100) = 0

┌─count()─┐
│ 1388345 │
└─────────┘

1 rows in set. Elapsed: 1.547 sec. Processed 109.95 million rows, 879.61 MB (71.07 million rows/s., 568.57 MB/s.)
julia> @time size(t[t.product_id .% 100 .== 0, :])
Time: 0:00:00 readed: 109.95 MRows (495.11 MRows/sec)
Time: 0:00:01 readed: 109.95 MRows (89.97 MRows/sec)
  1.448427 seconds (60.78 k allocations: 874.632 MiB, 1.88% gc time)
(1388345, 9)

And so on…

So there is room for optimizations…
Sorry for misleading :frowning:

5 Likes

After the blocks are loaded from the disk, what is the in-memory representation of the columns? Does it becomes a Vector or do you keep it as, e.g., vector-of-vectors? I am asking this because it’s hard to get a good performance with iterate-based iteration. Instead, you may be able to get more performance boost if you go with foldl-based solution as I’ve done in Transducers.jl. They are equivalent in terms of expressibility while you can do a lot of optimizations with foldl.

I work with one block per column at a time. I.e. iterator holds buffer as a tuple of vectors (one vector per column in result). It’s load next block to buffer, iterate over buffer and after that refill buffer with next block.

In more details there are more than one buffer. One buffer for raw data that is read from disk. One buffer for data of block after applying selections rules and another one for result data. I.e. data stream for each block looks like disk => block read buffer => block filtered buffer => block calculation results buffer.

1 Like

IIUC that would generate code that is equivalent to vector-of-vectors representation for iterate (i.e., there is a special branch at the end of each block). It may be useful to write Transducers.__foldl__ (see https://tkf.github.io/Transducers.jl/dev/examples/reducibles/) if you want to optimize iterations over a column and a set of columns. This is the same mechanism as how I optimized Iterators.flatten for Julia 1.4 (see https://github.com/JuliaLang/julia/pull/33526#issuecomment-561453581)

2 Likes

I will look at the Transducers, but I do not think that this can make a noticeable optimization in my case. It’s may be useful than iterating DFColumn, i.e. somethings like unique(t.user_id), but not for data filtration / materialization. Maybe I just didn’t understand Transducers. Or I poorly explained how the selection works.

julia> view = t[(t.price.>10).&(t.brand.=="apple"), :]
julia> col = (view.category_id .+ view.user_id) ./ view.price
julia> materialize(col)

For this example, the process is as follows

  1. Find required columns for the selection (t.price.>10).&(t.brand.=="apple") (:brand and :price), make a buffer as a named tuple (price = Vector{Float64}(undef, blocksize), brand = Vector{String}(undef, blocksize)) and create Broadcasted object on that buffer (flatten (i.e. without nested broadcasts) equivalent of (t.price.>10).&(t.brand.=="apple"))
  2. Find required columns for the projection (view.category_id .+ view.user_id) ./ view.price and further similar to selection
  3. Read one block from brand and price columns. Fill the selection buffer from point 1 with it.
  4. Execute the selection broadcast
  5. If the result of the selection broadcast is not empty read one block from columns required for the projection (:category_id and user_id)
  6. Filter it by the result of the selection broadcast, copy to the projection calculation buffer and execute the projection broadcast on the projection buffer
  7. Append the result of point 6 to the result of materialize
  8. Go To point 3

I don’t see place for Transducers in this flow. May be i’m wrong?

1 Like

OK filtering is more tricky. But I think it’s doable.

I’d implement processes 1-5 as foldl, process 6 as a transducer Map, and 7 as a reducing function push!!. All of them can be exposed as a “single sweep” of the table without showing the blocked nature, at least as the high-level part of the API.

Of course, since you already implemented the blocked version, I guess re-formulating this as foldl doesn’t give you much in terms of the performance. But, if you want to refactor post-filtering processing (mapping, reduction, etc.) and/or planning to support parallelism, hooking into Transducers.__foldl__ may be an interesting option.

2 Likes

Yes I think it can be really interesting in post-processing, for example, in aggregations, when I get to their implementation.
The main reason for using blocks structure is to never allocate more than is required for processing one block and reuse once allocated buffers to process all blocks. And, in additional, blocks are convenient to compress when writing to a file

Yes, I’d imagine blocked data structures are valuable in “data science” libraries. That’s actually one of my motivations behind Transducers.jl. iterate is not powerful enough to implement iterations over such data structures efficiently. So, you’d end up in an awkward situation where you need to advice users to not use loops, even though Julia has excellent “raw loops” support for basic data structures such as Vector. I’m hoping that foldl-based iteration provides an alternative basis for efficient iterations on blocked data structures.

1 Like

Sorry for offtopic, but I’m in similar awkward situation with columns.They support an array interface but are not inherited from AbstractArray, because of Julia loves to iterate through AbstractArray using the “each index” method instead of “iterate”, that is, it actually replaces the iteration with random access. For my columns, random access is too slow. And I do not really understand how to solve this problem.

Well, my answer is always "use foldl" so maybe you are asking a wrong person :slight_smile: But, to make iterate as fast as possible, why not just define a custom iterate? I think the implementations of iterate for CartesianIndices and Iterators.product are good samples (you just keep the state for each “axis”). Another simple version is my (minimal) implementation of iterate for vector-of-vectors:
https://tkf.github.io/Transducers.jl/dev/examples/reducibles/

The problem is that Julia don’t use iterate for AbstractArray by default:
from base/abstractarray.jl

copyto!(dest::AbstractArray, src::AbstractArray) =
    copyto!(IndexStyle(dest), dest, IndexStyle(src), src)

function copyto!(::IndexStyle, dest::AbstractArray, ::IndexStyle, src::AbstractArray)
    destinds, srcinds = LinearIndices(dest), LinearIndices(src)
    isempty(srcinds) || (checkbounds(Bool, destinds, first(srcinds)) && checkbounds(Bool, destinds, last(srcinds))) ||
        throw(BoundsError(dest, srcinds))
    @inbounds for i in srcinds
        dest[i] = src[i]
    end
    return dest
end

So if you define custom iterate and inherited from AbstractArray you will get a big surprise when using copyto! if random access to your array is slow. I can define custom copyto!, but I think this is not the only place with a similar problem

Ah, sorry, I completely misread your comment. Yeah, I agree that’s a big problem. I think we need a better approach than using IndexStyle. I think a temporary workaround is to just define custom methods for functions like copyto!… That’s what sparse arrays are doing.

considering the short time period of development, the package’s performance is actually very amazing. could you get it registered first?

2 Likes

@waralex I am noob at this, do you think you can point me to some resource or explain here briefly why breaking it into chunks like that would be good? What are the pros and cons?

Is this an effective strategy if the file totals billions of rows e.g. 2 billion?