Read csv files slow

Hi

I am loading a long list of text files into memory using DataFrames (joined on the Date Column)

It is taking very long to read them in so tried to do multithreading since my nthreads()=12

The multithreading is only working up to a point though as seen in the table below and 8000 files takes 10x longer than 4000 files despite only being 40% larger in storage. The table below shows time taken vs number of files and total file size

Any tips for improvement will be greatly appreciated!

Files Size (mb) Time (s)
1000 372 15
2000 644 28
4000 1000 127
8000 1400 1260
#single threaded readfiles
function readfiles(files,filedirectory)
    df3=DataFrame!(CSV.File(filedirectory*files[1],threaded=true))
    DataFrames.rename!(df3, [Symbol("col$i") for i in 1:6])
    select!(df3,"col1","col5")
    DataFrames.rename!(df3, "col5"=> SubString(files[1],1:length(files[1])-4))
    deleteat!(files,1)
    for file in files
        df2=DataFrame!(CSV.File(filedirectory*file,threaded=true))
        DataFrames.rename!(df2, [Symbol("col$i") for i in 1:6])
        select!(df2,"col1","col5")
        DataFrames.rename!(df2, "col5"=> SubString(file,1:length(file)-4))
        df3=outerjoin(df3, df2, on = :"col1")
    end
    return df3
end
#multithreaded readfiles
function readfiles_par(filedirectory)
    files=readdir(filedirectory)
    n=Threads.nthreads()
    width=div(length(files),n)
    a=Array{Task}(undef,n)
    for j=1:n
        start=(j-1)*width+1
        a[j]=Threads.@spawn readfiles(files[start:(start+width-1)],filedirectory)
    end
    results=Array{DataFrame}(undef,n)
    for j=1:n
        results[j]=fetch(a[j])
    end
    df=results[1]
    for j=2:n
        df=outerjoin(df,results[j],on=:col1)
    end
    df.Date = Date.(df.col1, "mm/dd/yyyy")
    select!(df, Not("col1"))
    return df
end

The exponential(?) timings indicate the problem I would expect from outerjoin. Have you verified that reading the CSV files is the bottleneck and not the joining part?

Not sure, trying to get a baseline with a once off read of files

Error MethodError: no method matching joinpath(::Array{String,1})

fileDirectory="d:/Data/Test/"
using Glob
files=glob("*.txt", fileDirectory) 
df3=DataFrame(CSV.File(files))

You need to broadcast the vector of files. Try this (untested) code instead:

fileDirectory="d:/Data/Test/"
using Glob
files=glob("*.txt", fileDirectory) 
df3=DataFrame.(CSV.File.(files))

Files are not the same length unfortunately
DimensionMismatch(“column :x1 has length 14735 and column :x2 has length 14734”)

Can you import just one file using CSV.jl? Its best to try and break your problem into smaller steps to isolate the problem.

1 Like

Like the other posters, I also doubt that the CSV reading is slow. Likely it is the manipulation and joining of the DataFrames that is slow.

Here is one other thing you could try, which may or may not be faster. Use pmap and read ALL of the CSVs in and do your small bit of preparation on them but do not join them into one.

using Distributed, DataFrames, CSV
addprocs(12)
function readandprep(file)
    df2=DataFrame!(CSV.File(file))
    DataFrames.rename!(df2, [Symbol("col$i") for i in 1:6])
    select!(df2,"col1","col5")
    DataFrames.rename!(df2, "col5"=> SubString(file,1:length(file)-4))
    return df2
end

all_df = pmap(readandprep,listofcsvfiles)

That should (I think?) net you a vector of DataFrames that are ready to be smooshed together however you like. I’d guess that the pmap should not take excessively long compared to the join, but who knows.

What is likely to be the bottleneck is:

(note: you do not need : befor "col1")

thanks. will give that a shot. seems like join only accepts two arguments so cant join 8000 files. how would you outerjoin the list of files on a column like date?

In a loop just like you are doing now, but in this case the data preparation is already done.

Are you sure you need a join? and not vcat the data frames together?

I know I’m late to the party, but I believe CSV uses threading out of the box when reading files above a certain size (at least I found a bug on error reporting which was related to that) and so trying to thread on top of that might not work as expected.

output dataframe [date, file1, file2, file3…]
joined on date
so unfortunately have to join at some stage

If so, you can do something like this

julia> dfs = [DataFrame(a = rand(1:100, 2), b = rand(2)) for i in 1:100]

julia> function myjoin(df1, df2)
           outerjoin(df1, df2, on = "a", makeunique = true)
       end

julia> reduce(myjoin, dfs);
2 Likes