Alternatives to pipelining Table operations like "filter" and "columns"? How to speed this up?

Hi all, I found that the following chunk is the major bottleneck in my code. For each “id” (subject), it uses pipelines to filter out the data for that “id”, creates the columns object, and then build the MyObs type.

Is this approach the most efficient out there? Any suggestions for improvement?

The @profile result is too long to paste here, but I found that |>, iterate (from TableOperations.jl), columns and buildcolumns have very large counts next to them.

I attached a reproducible example at the end.

for (i, id) in enumerate(subset_id)
    obsvec[i] = datatable_cols |> 
        TableOperations.filter(x -> Tables.getcolumn(x, :id) == id) |> 
        Tables.columns |> 
        myobs(feformula, reformula)
end

Reproducible example:

using LinearAlgebra, Random, StatsModels, DataFrames, CSV, JuliaDB, TableOperations, Tables, Profile

############ Preparation ############
# define type and constructors
struct MyObs{T <: LinearAlgebra.BlasReal}
    y::Vector{T}
    X::Matrix{T} 
    Z::Matrix{T}
    xty::Vector{T} 
    zty::Vector{T}
end
function MyObs(
    y::Vector{T},
    X::Matrix{T},
    Z::Matrix{T}
    ) where T <: LinearAlgebra.BlasReal
    xty = transpose(X) * y
    zty = transpose(Z) * y
    MyObs{T}(y, X, Z, xty, zty)
end
function myobs(data_obs, feformula::FormulaTerm, reformula::FormulaTerm)
    y, X = StatsModels.modelcols(feformula, data_obs)
    Z = StatsModels.modelmatrix(reformula, data_obs)
    return MyObs(y, X, Z)
end
myobs(feformula::FormulaTerm, reformula::FormulaTerm) = data_obs -> myobs(data_obs, feformula, reformula)

# simulate a dataset and save it to csv
Random.seed!(1)
reps = 20
N = 10^6
p = 5
q = 2
X = Matrix{Float64}(undef, N*reps, p)
X[:, 1] = ones(N*reps)
@views randn!(X[:, 2:p])
rand_intercept = zeros(N*reps)
for j in 1:N
    rand_intercept[(reps * (j-1) + 1) : reps * j] .= Random.randn(1)
end
y = X * ones(p) + # fixed effects
    rand_intercept + # random intercept, standard normal
    Random.randn(N*reps) # error, standard normal
id = repeat(1:N, inner = reps)
dat = DataFrame(hcat(id, y, X))
rename!(dat, Symbol.(["id", "y", "x1", "x2", "x3", "x4", "x5"]))
CSV.write("testfile.csv", dat)
############ Running ############
# load data
datatable = JuliaDB.loadtable("testfile.csv"; distributed = false)
feformula   = @formula(y ~ 1 + x1 + x2 + x3 + x4 + x5)
reformula   = @formula(y ~ 1)
feformula = apply_schema(feformula, schema(feformula, datatable))
reformula = apply_schema(reformula, schema(reformula, datatable))
datatable_cols = Tables.columns(datatable)

subset_size = 1000
obsvec = Vector{MyObs{Float64}}(undef, subset_size)
subset_id = [1:1:subset_size;]
Profile.clear()
# This is the bottle neck
@profile for (i, id) in enumerate(subset_id)
    obsvec[i] = datatable_cols |> 
        TableOperations.filter(x -> Tables.getcolumn(x, :id) == id) |> 
        Tables.columns |> 
        myobs(feformula, reformula)
end
Profile.print(format = :flat)

Is this any faster?

I am not sure if the result is the same as your function though (i have not written a comparison/isapprox for your type)

datatable_df = DataFrame(datatable)
function me2(feformula,reformula,datatable_df)
    subset_size = 1000
    obsvec = Vector{MyObs{Float64}}(undef, subset_size)
    subset_id = [1:1:subset_size;]
    # This is the bottle neck
    for (i, id) in enumerate(subset_id)
        obsvec[i] = myobs(filter(:id => isequal(id),datatable_df) ,feformula, reformula)
    end
    return obsvec
end

Thanks for getting back. I changed your filter function because the data source needs to be of any type compatible with Tables.jl, not only DataFrames.

After changing it, I found that it’s actually slower than the original piping approach.

With piping:

subset_size = 100
obsvec = Vector{MyObs{Float64}}(undef, subset_size)
function f1!(obsvec, feformula, reformula, datatable_cols, subset_size)
    # obsvec = Vector{MyObs{Float64}}(undef, subset_size)
    subset_id = [1:1:subset_size;]
    for (i, id) in enumerate(subset_id)
        obsvec[i] = datatable_cols |> 
            TableOperations.filter(x -> Tables.getcolumn(x, :id) == id) |> 
            Tables.columns |> 
            myobs(feformula, reformula)
    end
    return obsvec
end
@benchmark f1!(obsvec, feformula, reformula, datatable_cols, subset_size)

This gives

BenchmarkTools.Trial: 
  memory estimate:  982.13 KiB
  allocs estimate:  8101
  --------------
  minimum time:     1.762 s (0.00% GC)
  median time:      1.766 s (0.00% GC)
  mean time:        1.766 s (0.00% GC)
  maximum time:     1.771 s (0.00% GC)
  --------------
  samples:          3
  evals/sample:     1

Without piping:

subset_size = 100
obsvec = Vector{MyObs{Float64}}(undef, subset_size)
function f2!(obsvec, feformula, reformula, datatable_cols, subset_size)
    # obsvec = Vector{MyObs{Float64}}(undef, subset_size)
    subset_id = [1:1:subset_size;]
    # This is the bottle neck
    for (i, id) in enumerate(subset_id)
        obsvec[i] = myobs(TableOperations.filter(x -> Tables.getcolumn(x, :id) == id, datatable_cols),
                            feformula, reformula)
        # obsvec[i] = myobs(TableOperations.filter(:id => isequal(id), datatable_cols) ,feformula, reformula)
    end
    return obsvec
end
@benchmark f2!(obsvec, feformula, reformula, datatable_cols, subset_size)

This gives

BenchmarkTools.Trial: 
  memory estimate:  1.41 MiB
  allocs estimate:  12201
  --------------
  minimum time:     3.527 s (0.00% GC)
  median time:      3.530 s (0.00% GC)
  mean time:        3.530 s (0.00% GC)
  maximum time:     3.534 s (0.00% GC)
  --------------
  samples:          2
  evals/sample:     1

Apologies. This was not apparent from your original post.
The DataFrame filter was faster (maybe 15%) for me than the piping solution.