I’m trying to learn how to do distributed/parallel computing and I’m using JuliaDB and running into several issues that I don’t understand.
First, I loaded 4 CSV files, each of which is 3GB - 4GB in size, like this:
using JuliaDB
loadtable(glob("data/*.csv"), output = "bin", chunks = 16, type_detect_rows = 2000)
I thought that the chunks = 16
argument would result in a distributed table with 16 chunks, but it didn’t do that (I guess it is just for merging CSVs together, not splitting them?). In the bin directory I have 4 files and when I call load("bin")
I get a distributed table in 4 chunks that contains nearly 16 million rows and 286 columns
I then wrote a function to do some filtering/aggregating of the data and used @time to test how long its execution takes:
using JuliaDB, JuliaDBMeta, OnlineStats, Statistics
using JuliaDBMeta: @filter
tbl = load("bin")
function by_age(lower::Int64, upper::Int64, table)
result = @applychunked table begin
@where !ismissing(:AGEP) &&
upper >= :AGEP >= lower
end
return reduce(+, result; select = :PWGTP)
end
@time by_age(35, 44, tbl)
The call to by_age
takes around 60 seconds each time (after the initial run which takes > 2 mins):
julia> @time by_age(35, 44, tbl)
67.165804 seconds (329.40 M allocations: 38.674 GiB, 9.58% gc time)
40735140
julia> @time by_age(35, 44, tbl)
60.377765 seconds (328.61 M allocations: 38.634 GiB, 10.46% gc time)
40735140
julia> @time by_age(35, 44, tbl)
58.394598 seconds (328.61 M allocations: 38.634 GiB, 10.79% gc time)
40735140
julia> @time by_age(35, 44, tbl)
57.983220 seconds (328.61 M allocations: 38.634 GiB, 10.90% gc time)
40735140
Interestingly, if I swap the +
in the by_age
function with Sum()
from OnlineStats, it’s not any faster:
function by_age(lower::Int64, upper::Int64, table)
result = @applychunked table begin
@where !ismissing(:AGEP) &&
upper >= :AGEP >= lower
end
return reduce(Sum(), result; select = :PWGTP) # changed + to Sum()
end
julia> @time by_age(35, 44, tbl)
58.877263 seconds (327.32 M allocations: 38.635 GiB, 10.90% gc time)
Sum: n=1845493 | value=4.07351e7
julia> @time by_age(35, 44, tbl)
58.634385 seconds (326.76 M allocations: 38.606 GiB, 10.81% gc time)
Sum: n=1845493 | value=4.07351e7
julia> @time by_age(35, 44, tbl)
60.791893 seconds (326.76 M allocations: 38.606 GiB, 10.78% gc time)
Sum: n=1845493 | value=4.07351e7
julia> @time by_age(35, 44, tbl)
60.314000 seconds (326.76 M allocations: 38.606 GiB, 10.87% gc time)
Sum: n=1845493 | value=4.07351e7
After this, I loaded the Distributed package and added an additional process for a total of 2 processes (workers?). I’m really not sure what the correct way to do this is:
using Distributed
while length(procs()) < 2
addprocs(1)
end
@everywhere using JuliaDB, JuliaDBMeta, OnlineStats, Statistics
@everywhere using JuliaDBMeta: @filter
I’m not sure if I need the @everywhere as I’ve placed it in the snippet above.
The code goes on like this:
tbl = load("bin")
@everywhere function by_age(lower::Int64, upper::Int64, table)
result = @applychunked table begin
@where !ismissing(:AGEP) &&
upper >= :AGEP >= lower
end
return reduce(Sum(), result; select = :PWGTP)
end
Again, I don’t know if I need the @everywhere before the function definition.
I then proceed to call the by_age
function, with @time, and after the first two calls (which were slower than before I added Distributed and the additional process) it crashes Julia and my IDE (VS Code with Julia extension). I tried this again and multiple calls to by_age
always crashes the program.
I’m hoping someone can explain what the correct way to do this is. Any assistance would be greatly appreciated!!