Reading a few rows from a BIG CSV file

Ah, millions of lines + hundreds of columns was the piece I was missing. That doesn’t sound outrageous then.

Sorry to revive this old topic, but I am also interested in how to read in just a subset of lines using CSV. My data file has 2m rows but far fewer columns that the OP’s data file (it’s the ONS Postcode Data for the UK). It’s not particularly slow to load, but I have tried to compare the method suggested here with a straightforward load then subset:

using DataFrames
using CSV
using BenchmarkTools

function updateGeogs1(onspdfile, postcodes) 
    df1 = CSV.read(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true, DataFrame)
    subset!(df1, :pcds => ByRow(x -> in(x, postcodes)))
    return df1
end

function updateGeogs2(onspdfile, postcodes) 
#    df1 = CSV.read(onspdfile, limit=1, DataFrame)
#    thetypes = [typeof(df1[1,i]) for i in 1:ncol(df1)]
    thetypes = [String15, String15, String15, Int64, Union{Missing,Int64}, String15, String15, String15, String15, String15, Int64, Int64, Int64, Int64, String15, String15, String15, String15, Int64, String15, String15, String15, String15, String15, String15, String7, String15, String7, String15, String15, String15, Int64, String3, String15, String15, String15, String15, String15, String15, String15, Int64, String3, Float64, Float64, String15, Missing, String15, Int64, String15, String15, Missing, Missing, Missing]
#    println(thetypes)
    df2 = Iterators.filter(x -> in(x[:pcds], postcodes), CSV.Rows(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], types=thetypes, stripwhitespace = true)) |> DataFrame
    return df2
end

function main()
    onspdfile = raw"ONSPD_NOV_2023_UK.csv"
    postcodes = ["RG24 4PA", "CB3 0AP", "CR9 1DG", "EN1 3XY", "NW1W 8JN", "N22 7SG", "L36 9YU", "OX1 9TY", "CV21 9NL", "SY1 1NA", "L1 5BL", "DY10 3TE", "NG5 1DD", "OX16 9AA", "E5 0LL", "OX1 2AQ", "WF1 2AP", "BN11 4HY", "S70 2TA", "MK45 2DD", "B91 1DD", "NR27 0LJ", "LN11 0BX", "BS5 6HN", "WC2R 1LA", "CB3 0HZ", "E1 1EJ", "WC1N 2LX", "BS40 8QB", "BS6 6SU", "SE3 7SE", "OX2 7TG"]
    @btime updateGeogs1($onspdfile, $postcodes)
    @btime updateGeogs2($onspdfile, $postcodes)
    return nothing
end

main()

The results were:

  Read and subset:   8.454 s (3142354 allocations: 317.61 MiB)
  Iterate CSV.Rows: 10.427 s (17095677 allocations: 1.74 GiB)

In other words, just reading the whole file and then taking a subset of the required rows was 20% faster than selecting using CSV.Rows.

The trick of inferring types from the first line of the CSV file didn’t work. Most postcodes fit into a String7, including the one on the first row. But some are 8 element strings. Also, one column contained occasional missing values.

Is there another, better way to do this or is CSV now as efficient as it gets?

1 Like

CSV.jl is really really good for most problems. If it doesn’t do a good job for something CSV related, then I turn to DuckDB https://duckdb.org/

2 Likes

using TableOperations
CSV.File(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true) |> TableOperations.filter(x->x[1] in postcodes) |>DataFrame 

# or a variant of your first function, valid in case the postcodes are unique

function updateGeogs1b(onspdfile, postcodes) 
   df1 = CSV.read(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true, DataFrame)
  df1[indexin(postcodes,df1.pcds),:]
end

2 Likes

Thanks for these suggestions @rocco_sprmnt21. I’ve tried them:

using DataFrames
using CSV
using BenchmarkTools
using TableOperations

function ug0(onspdfile, postcodes)
    df0 = CSV.File(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true) |> TableOperations.filter(x->x[1] in postcodes) |>DataFrame 
    return df0
end
function ug1b(onspdfile, postcodes) 
    df1b = CSV.read(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true, DataFrame)
    df1b=df1b[indexin(postcodes,df1b.pcds),:]
    return df1b
end
function updateGeogs1(onspdfile, postcodes) 
    df1 = CSV.read(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true, DataFrame)
    subset!(df1, :pcds => ByRow(x -> in(x, postcodes)))
    return df1
end
function updateGeogs2(onspdfile, postcodes) 
    thetypes = [String15, String15, String15, Int64, Union{Missing,Int64}, String15, String15, String15, String15, String15, Int64, Int64, Int64, Int64, String15, String15, String15, String15, Int64, String15, String15, String15, String15, String15, String15, String7, String15, String7, String15, String15, String15, Int64, String3, String15, String15, String15, String15, String15, String15, String15, Int64, String3, Float64, Float64, String15, Missing, String15, Int64, String15, String15, Missing, Missing, Missing]
    df2 = Iterators.filter(x -> in(x[:pcds], postcodes), CSV.Rows(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], types=thetypes, stripwhitespace = true)) |> DataFrame
    return df2
end

function main()
    onspdfile = raw"ONSPD_NOV_2023_UK.csv"
    postcodes = ["RG24 4PA", "CB3 0AP", "CR9 1DG", "EN1 3XY", "NW1W 8JN", "N22 7SG", "L36 9YU", "OX1 9TY", "CV21 9NL", "SY1 1NA", "L1 5BL", "DY10 3TE", "NG5 1DD", "OX16 9AA", "E5 0LL", "OX1 2AQ", "WF1 2AP", "BN11 4HY", "S70 2TA", "MK45 2DD", "B91 1DD", "NR27 0LJ", "LN11 0BX", "BS5 6HN", "WC2R 1LA", "CB3 0HZ", "E1 1EJ", "WC1N 2LX", "BS40 8QB", "BS6 6SU", "SE3 7SE", "OX2 7TG"]
    @btime ug0($onspdfile, $postcodes)
    @btime ug1b($onspdfile, $postcodes)
    @btime updateGeogs1($onspdfile, $postcodes)
    @btime updateGeogs2($onspdfile, $postcodes)
    return nothing
end

main()

And get the following timings:

  ug0:  8.478 s (8537080 allocations: 417.29 MiB)
  ug1b: 8.430 s (3142286 allocations: 489.64 MiB)
  updateGeogs1: 8.378 s (3142354 allocations: 317.61 MiB)
  updateGeogs2: 11.104 s (17095677 allocations: 1.74 GiB)

So read and subset! is the marginal winner.

Thanks for this suggestion, @tbeason. :smiley:
I’ll have a look at DuckDB to see what I can do with it.

Tried the OP’s file: 2,211,787 rows, 238 columns:

using DataFrames
using CSV
using BenchmarkTools
function psamTest(psamfile)
    dfp = CSV.read(psamfile, DataFrame)
    subset!(dfp, :NP => ByRow(x -> rand() < 0.2 && x >= 3))
    return dfp
end
function main()
    psamfile = raw"psam_husa.csv"
    @btime psamTest($psamfile)
    return nothing
end

main()
 26.985 s (23802 allocations: 4.36 GiB)

Looks like CSV has made a lot of improvements since 2021!

2 Likes
Here my measures
julia> using DataFrames

julia> using CSV

julia> using BenchmarkTools

julia> using TableOperations

julia> function ug0(onspdfile, postcodes)
           df0 = CSV.File(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true) |> TableOperations.filter(x->x[1] in postcodes) |>DataFrame 
           return df0
       end
ug0 (generic function with 1 method)

julia> function ug1b(onspdfile, postcodes)
           df1b = CSV.read(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true, DataFrame)
           df1b=df1b[indexin(postcodes,df1b.pcds),:]
           return df1b
       end
ug1b (generic function with 1 method)

julia> function updateGeogs1(onspdfile, postcodes)
           df1 = CSV.read(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], stripwhitespace=true, DataFrame)
           subset!(df1, :pcds => ByRow(x -> in(x, postcodes)))
           return df1
       end
updateGeogs1 (generic function with 1 method)

julia> function updateGeogs2(onspdfile, postcodes)
           thetypes = [String15, String15, String15, Int64, Union{Missing,Int64}, String15, String15, String15, String15, String15, Int64, Int64, Int64, Int64, String15, String15, String15, String15, Int64, String15, String15, String15, String15, String15, String15, String7, String15, String7, String15, String15, String15, Int64, String3, String15, String15, String15, String15, String15, String15, String15, Int64, String3, Float64, Float64, String15, Missing, String15, Int64, String15, String15, Missing, Missing, Missing]
           df2 = Iterators.filter(x -> in(x[:pcds], postcodes), CSV.Rows(onspdfile, select=["pcds", "dointr", "doterm", "osward", "pcon"], types=thetypes, stripwhitespace = true)) |> DataFrame
           return df2
       end
updateGeogs2 (generic function with 1 method)

julia> function main()
           onspdfile = raw"ONSPD_NOV_2023_UK.csv"
           postcodes = ["RG24 4PA", "CB3 0AP", "CR9 1DG", "EN1 3XY", "NW1W 8JN", "N22 7SG", "L36 9YU", "OX1 9TY", "CV21 9NL", "SY1 1NA", "L1 5BL", "DY10 3TE", "NG5 1DD", "OX16 9AA", "E5 0LL", "OX1 2AQ", "WF1 2AP", "BN11 4HY", "S70 2TA", "MK45 2DD", "B91 1DD", "NR27 0LJ", "LN11 0BX", "BS5 6HN", "WC2R 1LA", "CB3 0HZ", "E1 1EJ", "WC1N 2LX", "BS40 8QB", "BS6 6SU", "SE3 7SE", "OX2 7TG"] 
           @btime ug0($onspdfile, $postcodes)
           @btime ug1b($onspdfile, $postcodes)
           @btime updateGeogs1($onspdfile, $postcodes)
           @btime updateGeogs2($onspdfile, $postcodes)
           return nothing
       end
main (generic function with 1 method)

julia> main()
  9.864 s (8239921 allocations: 391.03 MiB)
  5.595 s (12613 allocations: 398.71 MiB)
  5.948 s (12792 allocations: 265.81 MiB)
  11.770 s (17095781 allocations: 1.74 GiB)
julia> main()
  9.864 s (8239921 allocations: 391.03 MiB)
  5.595 s (12613 allocations: 398.71 MiB)
  5.948 s (12792 allocations: 265.81 MiB)
  11.770 s (17095781 allocations: 1.74 GiB)

julia> versioninfo()
Julia Version 1.9.0
Commit 8e63055292 (2023-05-07 11:25 UTC)
Platform Info:
  OS: Windows (x86_64-w64-mingw32)
  CPU: 8 × 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-14.0.6 (ORCJIT, tigerlake)
  Threads: 4 on 8 virtual cores
Environment:
  JULIA_EDITOR = code
  JULIA_NUM_THREADS = 4


(@v1.9.0) pkg> st CSV
Status `C:\Users\sprmn\.julia\environments\v1.9.0\Project.toml`
  [336ed68f] CSV v0.10.12

(@v1.9.0) pkg> st DataFrames
Status `C:\Users\sprmn\.julia\environments\v1.9.0\Project.toml`
  [a93c6f00] DataFrames v1.6.1

(@v1.9.0) pkg> st BenchmarkTools
Status `C:\Users\sprmn\.julia\environments\v1.9.0\Project.toml`
  [6e4b80f9] BenchmarkTools v1.4.0

some slight improvement can be obtained by making the vector of codes a set

    postcodes = Set(["RG24 4PA", "CB3 0AP", "CR9 1DG", "EN1 3XY", "NW1W 8JN", "N22 7SG", "L36 9YU", "OX1 9TY", "CV21 9NL", "SY1 1NA", "L1 5BL", "DY10 3TE", "NG5 1DD", "OX16 9AA", "E5 0LL", "OX1 2AQ", "WF1 2AP", "BN11 4HY", "S70 2TA", "MK45 2DD", "B91 1DD", "NR27 0LJ", "LN11 0BX", "BS5 6HN", "WC2R 1LA", "CB3 0HZ", "E1 1EJ", "WC1N 2LX", "BS40 8QB", "BS6 6SU", "SE3 7SE", "OX2 7TG"])
1 Like

You have me there; I’m only using 1 thread.
Also, Julia Version 1.10.0, but otherwise the same package versions as you.

I didn’t see any noticeable improvement using Set().

here is the origin of my belief

Hi,

whenever I see CSV threads where subsets are required I am always confused why Arrow.jl is not brought up.

since I didnt see a link to your ONSPD file, I downloaded the OP file and recreated your approach, but first converted the file to Arrow (and modified subset! to filterRows since Arrow is immutable).

using Arrow, CSV, DataFrames, BenchmarkTools

file = raw"psam_husa.csv"
convertedFile = raw"psam_husa.arrow"

Arrow.write(convertedFile, CSV.File(file,header=true, normalizenames=true), ntasks=1)

function filterRows(arrowCol, f)
  
    rowIndex = 0
    rows = Vector{Int32}()
    for r in arrowCol
        rowIndex += 1
        if f(r)
            push!(rows, rowIndex)
        end
    end

    rows

end

function psamTest(psamfile)
    dfp = Arrow.Table(psamfile)
    filterCol = dfp[:NP]
    f = x -> rand() < 0.2 && x >= 3

    #subset(dfp, :NP => ByRow(x -> rand() < 0.2 && x >= 3))
    return filterRows(filterCol, f)
end
function main()
    psamfile = raw"psam_husa.arrow"
    @btime psamTest($psamfile)
    return nothing
end

main()

yielding 16.027 ms (13991 allocations: 1.64 MiB) note that with these row indexes you can then go back and aggregate on other columns etc. this would also be much quicker if we just open the file once of course (but I didnt since you didnt).

to the original OP question Arrow is fully index accessible on columns, so taking the top 100 and writing to a datatable is straightforward.

if you link the ONSPD it would be possible (time permitting) to attempt your postcode filter question - note that you can actually even dictionary encode arrow files to mitigate string matching (which takes a while since the data is laid out in vectors of UInts with corresponding offsets).

regards,

2 Likes

And for reference, how long does it take to convert the big CSV file to Arrow format?

2 Likes

Good question - I only specified one thread and roughly 1 minute on my i7 at a guess, but it’s a once only operation of course

1 Like

is this?

here a dfiscussion about related topic.
here quinnj explain why explains why it is not worth filtering rows while reading

3 Likes

It is very useful to apply optimized code to the heavier processing bits of a task. So, using the RegEx library to preprocess the CSV and later properly parse the remaining lines gets a 2x improvement:

function updateGeogs3(onspdfile, postcodes) 
   re = Regex('('*join(postcodes,'|')*')')
   f = open(onspdfile)
   goodlines = vcat(readline(f), filter(l -> !isnothing(match(re, l)), readlines(f)))
   iob = IOBuffer(join(goodlines,'\n'))
   thetypes = [String15, String15, String15, Int64, Union{Missing,Int64}, String15, String15, String15, String15, String15, Int64, Int64, Int64, Int64, String15, String15, String15, String15, Int64, String15, String15, String15, String15, String15, String15, String7, String15, String7, String15, String15, String15, Int64, String3, String15, String15, String15, String15, String15, String15, String15, Int64, String3, Float64, Float64, String15, Missing, String15, Int64, String15, String15, Missing, Missing, Missing]
   df2 = Iterators.filter(x -> in(x[:pcds], postcodes), CSV.Rows(iob, select=["pcds", "dointr", "doterm", "osward", "pcon"], types=thetypes, stripwhitespace = true)) |> DataFrame
   return df2
end

This applies to parsing every line of the CSV the popular optimization cliché: The most efficient optimization is not to do the computation.

1 Like

Thanks for this - it does appear to be the correct file.

pulling it down and applying the arrow transform (which took 51.905011 seconds (218.61 M allocations: 10.079 GiB, 4.80% gc time, 39.06% compilation time: 10% of which was recompilation) - arrow conversion, 1 thread) :

using Arrow, CSV, DataFrames, BenchmarkTools

 file = raw"ONSPD_NOV_2023_UK.csv"
 convertedFile = raw"ONSPD_NOV_2023_UK.arrow"

 @time Arrow.write(convertedFile, CSV.File(file,header=true, normalizenames=true), ntasks=1)

function filterRows(arrowCol, f)
  
    rowIndex = 0
    rows = Vector{Int32}()
    for r in arrowCol
        rowIndex += 1
        if f(r)
            push!(rows, rowIndex)
        end
    end

    rows

end

function getArrowSubset(filterRows, selectedCols, dfp)
    df = DataFrame()
    for col in selectedCols
        dfCol = dfp[col]
        df[:, col] = dfCol[filterRows]
    end

df
end

function psamTest(psamfile)
    
    dfp = Arrow.Table(psamfile)
    filterCol = dfp[:pcds ]
    setValues = Set(["RG24 4PA", "CB3 0AP", "CR9 1DG", "EN1 3XY", "NW1W 8JN", "N22 7SG", "L36 9YU", "OX1 9TY", "CV21 9NL", "SY1 1NA", "L1 5BL", "DY10 3TE", "NG5 1DD", "OX16 9AA", "E5 0LL", "OX1 2AQ", "WF1 2AP", "BN11 4HY", "S70 2TA", "MK45 2DD", "B91 1DD", "NR27 0LJ", "LN11 0BX", "BS5 6HN", "WC2R 1LA", "CB3 0HZ", "E1 1EJ", "WC1N 2LX", "BS40 8QB", "BS6 6SU", "SE3 7SE", "OX2 7TG"])
    cols=[:pcds, :dointr, :doterm, :osward, :pcon]
    f = x -> in(x, setValues)

    #subset(dfp, :NP => ByRow(x -> rand() < 0.2 && x >= 3))    
    return getArrowSubset(filterRows(filterCol, f), cols , dfp)
end
function main()
    psamfile = raw"ONSPD_NOV_2023_UK.arrow"
    @btime psamTest($psamfile)
    return nothing
end

main()

yielding: 98.880 ms (2701325 allocations: 71.18 MiB)

Note that this is done quite naively in a sense as the arrow isnt dictEncoded so filtering will be slow for longer strings (postcodes probably arent problematic of course). if you wanted to chain together other filter predicates, that would be relatively straightforward since you can call them in a loop and intersect in sequence then apply the subset colums etc.

Regards,

3 Likes

I tried this and got:

 updateGeogs1   :   3.655 s (12771 allocations: 205.79 MiB)
 updateGeogs3   :   5.510 s (5495277 allocations: 1.56 GiB)

I admit I’ve changed the goalposts a bit because I now have 4 threads running. CSV.read seems to make good use of multiple threads but this method with CSV.Rows doesn’t seem to do so.
With only 1 thread, updateGeogs1 originally got:

updateGeogs1   :   8.378 s (3142354 allocations: 317.61 MiB)

so, yes, considerably slower than your suggestion.

I was not aware of Arrows.jl.

Yes, on my system I get a very similar 102.941 ms (2701318 allocations: 71.18 MiB)

The initial creation of the .arrow file takes 35.973939 seconds (216.50 M allocations: 9.796 GiB) (measured using @time not @btime, though).

I use the ONSPD file quite often, so creating an arrow file once and then using that rather than the .csv file might save quite a bit of time. I think I’ll have a go. Thanks!

1 Like

Note that the code above does yield a dataframe subset output, so your current workflow wouldnt necessarily have to change too much (especially in the MWE scenarios provided above, where you’re filtering on one column only). if you were uncomfortable creating filtering strategies yourself, you could work with this subset DataFrame in the normal way - only at the upfront cost of the initial conversion.

Regards,