I am loading a long list of text files into memory using DataFrames (joined on the Date Column)
It is taking very long to read them in so tried to do multithreading since my nthreads()=12
The multithreading is only working up to a point though as seen in the table below and 8000 files takes 10x longer than 4000 files despite only being 40% larger in storage. The table below shows time taken vs number of files and total file size
Any tips for improvement will be greatly appreciated!
Files
Size (mb)
Time (s)
1000
372
15
2000
644
28
4000
1000
127
8000
1400
1260
#single threaded readfiles
function readfiles(files,filedirectory)
df3=DataFrame!(CSV.File(filedirectory*files[1],threaded=true))
DataFrames.rename!(df3, [Symbol("col$i") for i in 1:6])
select!(df3,"col1","col5")
DataFrames.rename!(df3, "col5"=> SubString(files[1],1:length(files[1])-4))
deleteat!(files,1)
for file in files
df2=DataFrame!(CSV.File(filedirectory*file,threaded=true))
DataFrames.rename!(df2, [Symbol("col$i") for i in 1:6])
select!(df2,"col1","col5")
DataFrames.rename!(df2, "col5"=> SubString(file,1:length(file)-4))
df3=outerjoin(df3, df2, on = :"col1")
end
return df3
end
#multithreaded readfiles
function readfiles_par(filedirectory)
files=readdir(filedirectory)
n=Threads.nthreads()
width=div(length(files),n)
a=Array{Task}(undef,n)
for j=1:n
start=(j-1)*width+1
a[j]=Threads.@spawn readfiles(files[start:(start+width-1)],filedirectory)
end
results=Array{DataFrame}(undef,n)
for j=1:n
results[j]=fetch(a[j])
end
df=results[1]
for j=2:n
df=outerjoin(df,results[j],on=:col1)
end
df.Date = Date.(df.col1, "mm/dd/yyyy")
select!(df, Not("col1"))
return df
end
The exponential(?) timings indicate the problem I would expect from outerjoin. Have you verified that reading the CSV files is the bottleneck and not the joining part?
Like the other posters, I also doubt that the CSV reading is slow. Likely it is the manipulation and joining of the DataFrames that is slow.
Here is one other thing you could try, which may or may not be faster. Use pmap and read ALL of the CSVs in and do your small bit of preparation on them but do not join them into one.
using Distributed, DataFrames, CSV
addprocs(12)
function readandprep(file)
df2=DataFrame!(CSV.File(file))
DataFrames.rename!(df2, [Symbol("col$i") for i in 1:6])
select!(df2,"col1","col5")
DataFrames.rename!(df2, "col5"=> SubString(file,1:length(file)-4))
return df2
end
all_df = pmap(readandprep,listofcsvfiles)
That should (I think?) net you a vector of DataFrames that are ready to be smooshed together however you like. I’d guess that the pmap should not take excessively long compared to the join, but who knows.
thanks. will give that a shot. seems like join only accepts two arguments so cant join 8000 files. how would you outerjoin the list of files on a column like date?
I know I’m late to the party, but I believe CSV uses threading out of the box when reading files above a certain size (at least I found a bug on error reporting which was related to that) and so trying to thread on top of that might not work as expected.
julia> dfs = [DataFrame(a = rand(1:100, 2), b = rand(2)) for i in 1:100]
julia> function myjoin(df1, df2)
outerjoin(df1, df2, on = "a", makeunique = true)
end
julia> reduce(myjoin, dfs);