[ANN] DataFrameDBs.jl

I am going to fork the repo and try to see if I can learn Julia whilst trying to contribute to your library.

I think that this database system can add tremendous value; if it can pre-cache certain files in memory using maybe some predictive classifier like gradient boosting.

I donโ€™t know if something like this has ever existed to leverage machine learning to lower memory/cpu costs in servers.

1 Like

I did my first pull request ever with your repo so I am very excited :slight_smile: . Please let me know how I can contribute; I am not as smart as you guys but I definitely think that I can mimick your idiomatic Julia code.

Welcome, @randiaz95!
I donโ€™t know Julia very well, this is my second attempt to write something on it , so I donโ€™t think my code is the gold standard of code on Julia :slight_smile:
If you have any questions, please, ask.

If you have experience with this, then thatโ€™s good. I have some experience developing databases and high load projects in c ++, but I practically did not work with data science algorithms.

I still got an error:

ERROR: MethodError: no method matching Channel(::getfield(DataFrameDBs, Symbol("##23#24")){Float64,Int64,Base.TTY}; spawn=true)
Closest candidates are:
  Channel(::Function; ctype, csize, taskref) at channels.jl:100 got unsupported keyword argument "spawn"
  Channel(::Any) at channels.jl:50 got unsupported keyword argument "spawn"
Stacktrace:
 [1] kwerr(::NamedTuple{(:spawn,),Tuple{Bool}}, ::Type, ::Function) at ./error.jl:125
 [2] (::getfield(Core, Symbol("#kw#Type")))(::NamedTuple{(:spawn,),Tuple{Bool}}, ::Type{Channel}, ::Function) at ./none:0
 [3] #write_progress_channel#22(::Bool, ::Float64, ::typeof(DataFrameDBs.write_progress_channel), ::Int64, ::Base.TTY) at /home/yifanliu/.julia/packages/DataFrameDBs/LxszG/src/tables/progress.jl:61
 [4] write_progress_channel(::Int64, ::Base.TTY) at /home/yifanliu/.julia/packages/DataFrameDBs/LxszG/src/tables/progress.jl:61 (repeats 2 times)
 [5] #insert#39(::Bool, ::typeof(insert), ::DFTable, ::CSV.Rows{false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}) at /home/yifanliu/.julia/packages/DataFrameDBs/LxszG/src/io/columns.jl:135
 [6] #insert at ./none:0 [inlined]
 [7] #create_table#17(::CSV.Rows{false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}, ::Int64, ::Bool, ::typeof(create_table), ::String) at /home/yifanliu/.julia/packages/DataFrameDBs/LxszG/src/tables/creators.jl:87
 [8] (::getfield(DataFrameDBs, Symbol("#kw##create_table")))(::NamedTuple{(:from, :show_progress),Tuple{CSV.Rows{false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}},Bool}}, ::typeof(create_table), ::String) at ./none:0
 [9] top-level scope at REPL[3]:1

fixed in master

It worked!

I tested your package with a csv data set that is 33.5 gb on a laptop with 16 gb RAM. I got the results as below:

Time: 1:03:35.9681 written: 225.83 MRows (59.18 KRows/sec), uncompressed size: 47.57 GB, compressed size: 9.0 GB, compression ratio: 5.29

The uncompressed size seems to be much larger than the original data. I would like to know if there is any limit on the size of data that I could work on with your package.

I canโ€™t speak specifically to your file or this package, but it is common for the DataFrame to be bigger than the csv file. There is just more information that gets stored with the df. There are ways to address it depending on the data, such as using CategoricalArrays for things with few unique values.

1 Like

Great! Have you tried to make queries on this dataset?

About uncompressed size - this is the size that files would occupy without compression. The main overhead is given by the lengths of the strings. Since a block of strings is stored as a continuous array of bytes, it is necessary to write the length of each string before this block. If the strings are short , the lengths give a considerable overhead. But with compression this is not very important.

Theoretically, there are no restrictions on the data size. At least until you try to materialize a query result that doesnโ€™t fit in RAM. When I implement aggregation, the restrictions on it will be about the same - the aggregation result should fit in RAM
Similar architecture, which I implemented in c++ as an internal database of the company I work for , now contains 320,000,000,000 rows in 100 tables, takes up 2 TB of disk space and runs on a server with 96 GB of RAM. At the same time, it rarely consumes more than 20 GB - only on large aggregations and joynes

I will try the query functions later this week and thank you for this amazing package!

Thank you! And, please, tell me about any problems.

1 Like

Have you benchmarked performance of this package against JuliaDB, by chance? I ask because Iโ€™m a fairly regular user of JuliaDB as I work a lot with U.S. Census Bureau data sets that usually come in very large .csv files (millions of rows x hundreds of columns). My current workflow is to save them as IndexedTables and then do the querying/filtering/aggregating with JuliaDBMeta. Itโ€™s worked really well for me so far but Iโ€™d love to see how this compares.

I haven 't worked with DB, so I can only think theoretically. As I understand it, IndexedTables is an in memory table that is fully loaded into memory. If you have enough memory, they are probably faster than DataFrameDBs. On the other hand, with DataFrameDBs, you can only load the columns that you need. Or perform filtering and calculations without loading the entire columns into memory. Given the compression, this can greatly reduce the data loading time.

1 Like

some questions:

  1. the package name DataFrameDBs looks weird, maybe something more informative?

  2. what does the reuse_row=true condition do for CSV.Rows?

  3. would be better if show_progress=true can tell how much time is needed instead of how much time has passed

  4. when there is missing value in a column, how to convert it from string to Int64?

  5. when I run the code

c_best_bid = parse.(Int64, test.best_bid)
materialize(c_best_bid[1:10])

I got the error message (test.best_bid has no missing value):

ERROR: ArgumentError: invalid base 10 digit '.' in "15.6"
Stacktrace:
 [1] parse at ./parse.jl:240 [inlined]
 [2] _broadcast_getindex_evalf at ./broadcast.jl:625 [inlined]
 [3] _broadcast_getindex at ./broadcast.jl:608 [inlined]
 [4] getindex at ./broadcast.jl:558 [inlined]
 [5] macro expansion at ./broadcast.jl:888 [inlined]
 [6] macro expansion at ./simdloop.jl:77 [inlined]
 [7] copyto! at ./broadcast.jl:887 [inlined]
 [8] copyto! at ./broadcast.jl:842 [inlined]
 [9] materialize! at ./broadcast.jl:801 [inlined]
 [10] eval_on_range(::NamedTuple{(:best_bid_raw,),Tuple{DataFrameDBs.FlatStringsVectors.FlatStringsVector{Union{Missing, String}}}}, ::DataFrameDBs.BroadcastExecutor{Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(parse),Tuple{Base.RefValue{Type{Int64}},Array{Union{Missing, String},1}}},NamedTuple{(:best_bid_raw,),Tuple{Array{Union{Missing, String},1}}},Array{Int64,1}}, ::Base.LogicalIndex{Int64,Array{Bool,1}}) at /home/yifanliu/.julia/packages/DataFrameDBs/A2bCW/src/tables/broadcast.jl:130
 [11] _proj_elem_eval_on_range at /home/yifanliu/.julia/packages/DataFrameDBs/A2bCW/src/tables/projection.jl:128 [inlined]
 [12] _proj_eval_on_range at /home/yifanliu/.julia/packages/DataFrameDBs/A2bCW/src/tables/projection.jl:136 [inlined]
 [13] eval_on_range at /home/yifanliu/.julia/packages/DataFrameDBs/A2bCW/src/tables/projection.jl:152 [inlined]
 [14] iterate(::DataFrameDBs.BlocksIterator{DataFrameDBs.DataReader,NamedTuple{(:best_bid_raw,),Tuple{DataFrameDBs.BlockStream}},NamedTuple{(:best_bid_raw,),Tuple{DataFrameDBs.FlatStringsVectors.FlatStringsVector{Union{Missing, String}}}},DataFrameDBs.ProjectionExecutor{NamedTuple{(:a,),Tuple{DataFrameDBs.BroadcastExecutor{Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(parse),Tuple{Base.RefValue{Type{Int64}},Array{Union{Missing, String},1}}},NamedTuple{(:best_bid_raw,),Tuple{Array{Union{Missing, String},1}}},Array{Int64,1}}}}},DataFrameDBs.SelectionExecutor{Tuple{DataFrameDBs.RangeToProcess{UnitRange{Int64}}}},Tuple{},Tuple{Symbol}}, ::Nothing) at /home/yifanliu/.julia/packages/DataFrameDBs/A2bCW/src/io/blocksiterator.jl:117
 [15] iterate(::DataFrameDBs.BlocksIterator{DataFrameDBs.DataReader,NamedTuple{(:best_bid_raw,),Tuple{DataFrameDBs.BlockStream}},NamedTuple{(:best_bid_raw,),Tuple{DataFrameDBs.FlatStringsVectors.FlatStringsVector{Union{Missing, String}}}},DataFrameDBs.ProjectionExecutor{NamedTuple{(:a,),Tuple{DataFrameDBs.BroadcastExecutor{Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1},Nothing,typeof(parse),Tuple{Base.RefValue{Type{Int64}},Array{Union{Missing, String},1}}},NamedTuple{(:best_bid_raw,),Tuple{Array{Union{Missing, String},1}}},Array{Int64,1}}}}},DataFrameDBs.SelectionExecutor{Tuple{DataFrameDBs.RangeToProcess{UnitRange{Int64}}}},Tuple{},Tuple{Symbol}}) at /home/yifanliu/.julia/packages/DataFrameDBs/A2bCW/src/io/blocksiterator.jl:99
 [16] materialize(::DFColumn{Int64}) at /home/yifanliu/.julia/packages/DataFrameDBs/A2bCW/src/tables/materialization.jl:48
 [17] top-level scope at REPL[31]:1
  1. It is not registered yet, so I am ready to consider any suggestions for the name
  2. Itโ€™s my mistake :frowning: I meant reusebuffer, but I made a mistake when I tested the import myself, the CSV didnโ€™t say that this parameter doesn 't mean anything to it . Of course it must be reusebuffer = true
  3. The problem is that I don 't know the full amount of data in the import source in general.
conv_miss_int = (v)->ismissing(v) ? missing : parse(Int64, v)
new_column = conv_miss_int.(string_column)

or, if you want to replace missing with default value

conv_miss_int = (v)->ismissing(v) ? 0 #=default value=# : parse(Int64, v)
new_column = conv_miss_int.(string_column)
  1. It looks like there is a โ€œ15.6โ€ entry in the column that canโ€™t be parsed as Int64. Perhaps you should use Float64?

Thanks. more questions:

  1. converting column data type seems not easy, a new column needs to be created and then inserted back, is it possible to have something like parse!(Int64, v)?

  2. when I run view = test[:secid .== "6505", :], I got the error message:

ERROR: BoundsError: attempt to access View of table test/
Projection: secid=>col(secid)::Union{Missing, String}; date=>col(date)::Union{Missing, String}; symbol=>col(symbol)::Union{Missing, String}; symbol_flag=>col(symbol_flag)::Union{Missing, String}; exdate=>col(exdate)::Union{Missing, String}; last_date=>col(last_date)::Union{Missing, String}; cp_flag=>col(cp_flag)::Union{Missing, String}; strike_price=>col(strike_price)::Union{Missing, String}; best_bid=>col(best_bid)::Union{Missing, String}; best_offer=>col(best_offer)::Union{Missing, String}; volume=>col(volume)::Union{Missing, String}; open_interest=>col(open_interest)::Union{Missing, String}; impl_volatility=>col(impl_volatility)::Union{Missing, String}; delta=>col(delta)::Union{Missing, String}; gamma=>col(gamma)::Union{Missing, String}; vega=>col(vega)::Union{Missing, String}; theta=>col(theta)::Union{Missing, String}; optionid=>col(optionid)::Union{Missing, String}; cfadj=>col(cfadj)::Union{Missing, String}; am_settlement=>col(am_settlement)::Union{Missing, String}; contract_size=>col(contract_size)::Union{Missing, String}; ss_flag=>col(ss_flag)::Union{Missing, String}; forward_price=>col(forward_price)::Union{Missing, String}; expiry_indicator=>col(expiry_indicator)::Union{Missing, String}; root=>col(root)::Union{Missing, String}; suffix=>col(suffix)::Union{Missing, String}
Selection: 

  at index [false]

my data test is like this:


27ร—6 DataFrames.DataFrame
โ”‚ Row โ”‚ column           โ”‚ type                   โ”‚ rows         โ”‚ uncompressed size โ”‚ compressed size โ”‚ compression ratio โ”‚
โ”‚     โ”‚ Symbol           โ”‚ String                 โ”‚ String       โ”‚ String            โ”‚ String          โ”‚ Float64           โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ 1   โ”‚ secid            โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 2.1 GB            โ”‚ 9.28 MB         โ”‚ 232.06            โ”‚
โ”‚ 2   โ”‚ date             โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 2.94 GB           โ”‚ 24.94 MB        โ”‚ 120.9             โ”‚
โ”‚ 3   โ”‚ symbol           โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 4.34 GB           โ”‚ 297.23 MB       โ”‚ 14.96             โ”‚
โ”‚ 4   โ”‚ symbol_flag      โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 1.05 GB           โ”‚ 4.37 MB         โ”‚ 246.19            โ”‚
โ”‚ 5   โ”‚ exdate           โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 2.94 GB           โ”‚ 36.07 MB        โ”‚ 83.58             โ”‚
โ‹ฎ
โ”‚ 22  โ”‚ ss_flag          โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 1.05 GB           โ”‚ 4.75 MB         โ”‚ 226.62            โ”‚
โ”‚ 23  โ”‚ forward_price    โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 2.74 GB           โ”‚ 103.63 MB       โ”‚ 27.09             โ”‚
โ”‚ 24  โ”‚ expiry_indicator โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 916.7 MB          โ”‚ 8.02 MB         โ”‚ 114.35            โ”‚
โ”‚ 25  โ”‚ root             โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 861.49 MB         โ”‚ 3.51 MB         โ”‚ 245.69            โ”‚
โ”‚ 26  โ”‚ suffix           โ”‚ Union{Missing, String} โ”‚ 225.83 MRows โ”‚ 861.49 MB         โ”‚ 3.51 MB         โ”‚ 245.69            โ”‚
โ”‚ 27  โ”‚ Table total      โ”‚                        โ”‚ 225.83 MRows โ”‚ 47.57 GB          โ”‚ 9.0 GB          โ”‚ 5.29              โ”‚
  1. it would be great to see some statistics by group examples

  2. is it possible to run rolling statistics? like rolling mean, rolling standard deviation?

  3. is it possible to run regressions with the columns?

  4. is the speed of importing data totally dependent on the CSV package or there could be further optimization on your package?

  1. Iโ€™m thinking about how to do this. The conversion functions themselves are not part of the package, but a workaround over the reinsert is possible. Something like a map!(function, column)

  2. view = test[test.secid .== "6505", :] or view = test[:secid => (v) -> v== "6505", :] Itโ€™s similar to the DataFrames and other array-like structures. :secid .== "6505" is the broadcast on Symbol and String, so test[:secid .== "6505", :] is equivalent of test[false, :]
    3 - 5. Yes, it is possible, and this is the next big block of work that I plan to start as soon as I have a little time. I think that the integration of OnlineStats.jl will allow you to do all this.

  3. The CSV package plays a crucial role, but I will try to speed up the import as much as possible

2 Likes

In addition to item 2. If a column has type Union{Missing, String}, it is better to filter it as test[:secid => (v) -> Bool(v== "6505"), :]. Because the DataFrames requires the filtering function to have return type Bool, but return type of v== "6505" if v is Union{Missing, String} is Union{Missing, Bool}. If the column actually contains missings, then test[:secid => (v) -> Bool(!ismissing(v) && v== "6505"), :]

After a little use, do you think this package is useful. Does it make sense to develop it further?

1 Like

It would be great to have joins and also the ability to reshape (stack/melt/pivot) the larger than memory datasets.

I have now almost completely switched to developing Dash. So the development of DataFrameDBs has slowed down a lot. But Iโ€™ll get back to it as soon as I can. Joins is an interesting task, but not an easy one. Apparently first it will be implemented in memory (i.e. the result of the join will have to fit entirely in memory), then I can try to implement the join using temporary files.

2 Likes