I have a specific data reading problem that I’m trying to improve. I’m not sure if the details are important or not, but here’s the gist:
- I have some number (high hundreds to low thousands) of indiviudual CSV files. Each file has 2 columns, call them feature::Stringandvalue::Float, and represents oneobservation
- In the end, I need a single table, probably sparse, containing all of the values for each feature&observationpair. The orientation doesn’t matter all that much, I routinely need to access feature-wise and observation-wise.
- there are on the order of 5-50 million different features across the dataset, but eachobservationtends to havevalues for at most hundreds of thousands of them. All otherfeatures can be said to have avalueof 0, but are not represented in the rows of the CSVs if they aren’t in that observation.
- I am far more often loading the table in a new julia session than I am adding additional data, and I’m not really limited by diskspace, so anything to massage the data in a way that makes it easier to load would be fine, even if it takes a while.
Here’s a way to generate some toy data that has the form I’m concerned with:
using Random
using DataFrames
using CSV
using SparseArrays
"""
    nfeatures: total number of possible features
    nobservations: total number of observations
    nfeatperobs: Number of features in each observation
"""
function generate_testdata(nfeatures, nobservations, nfeatperobs, datadir=normpath(joinpath(@__DIR__, "..", "data")); force=false)
    if isdir(datadir)
        force ? rm(datadir, recursive=true) : error("$datadir exists, use `force=true` to overwrite")
    end
    mkdir(datadir)
    allfeatures = [randstring() for _ in 1:nfeatures]
    for o in 1:nobservations
        fpick = unique(rand(1:nfeatures, nfeatperobs))
        df = DataFrame(feature=allfeatures[fpick], value=rand(length(fpick)))
        CSV.write(joinpath(datadir, "obs$o.csv"), df)
    end
end
generate_testdata(10_000, 100, 500)
My first naive implementation was to basically read all of the CSVs into dictionaries, then fill a sparse array:
function join_from_csv1(datadir=normpath(joinpath(@__DIR__, "..", "data")))
    obs = map(f-> replace(f, ".csv"=>""), readdir(datadir))
    obsdict = Dict{String, Dict{String,Float64}}()
    features = Set()
    for ob in obs
        df = CSV.File(joinpath(datadir, "$ob.csv")) |> DataFrame
        obsdict[ob] = Dict(row.feature=> row.value for row in eachrow(df))
        union!(features, df.feature)
    end
    features=sort(collect(features))
    sa = spzeros(length(features),length(obs))
    featuredict = Dict(f=>i for (i,f) in enumerate(features))
    for (i, ob) in enumerate(obs)
        for f in keys(obsdict[ob])
            sa[featuredict[f], i] = obsdict[ob][f]
        end
    end
    return sa, features, obs
end
julia> @benchmark join_from_csv1()
BenchmarkTools.Trial:
  memory estimate:  20.98 MiB
  allocs estimate:  370977
  --------------
  minimum time:     134.594 ms (0.00% GC)
  median time:      150.562 ms (0.00% GC)
  mean time:        153.927 ms (4.30% GC)
  maximum time:     169.961 ms (9.90% GC)
  --------------
  samples:          33
  evals/sample:     1
This implementation takes ~35 min with my actual dataset, and I’m sure there are a bunch of ways to improve, but I have no experience with profiling and optimization. I’m of course happy to take any suggestions to improve the obvious problems, but I thought it would be a good opportunity to learn those skills. But I have a couple of questions:
- Is the smaller sample data sufficient for profiling / benchmarking? I’m assuming that steps I take to optimize on this small dataset will scale up, but is there any reason to suspect that’s not true?
- Is using the built-in profiler / ProfileView the right way to go? I had to run the function like 1000 times to get any substantial reading, but probably doing something wrong there.
- Anything else I should be considering, or good resources to look at?
- Assuming the no observation/feature pair overlap, are there any additonal considerations to trying to make this parallelizable? I’m doing this primarily on a system with 8 cores, though not sure the extent to which I’m bound reading off the HDDs
