Read multiple csv files from S3

Hi all,

I am trying to load multiple csv files from an s3 source but I seem to be hitting a roadblock when trying to use the local method of doing so (glob, csv, dataframes) and trying to incorporate AWSS3:

p = S3Path("s3://stuff/", config=global_aws_config());
readdir(p)

which produces:

30-element Vector{String}:
 "PNR_20220301.csv"
 "PNR_20220302.csv"
 "PNR_20220303.csv"
 "PNR_20220304.csv"
 "PNR_20220305.csv"
 "PNR_20220306.csv"
 "PNR_20220307.csv"
 "PNR_20220308.csv"
 "PNR_20220309.csv"
 "PNR_20220310.csv"
 "PNR_20220311.csv"
 "PNR_20220312.csv"
 "PNR_20220313.csv"
 â‹®

It’s simple to load any one of these files:

file= AWSS3.read(S3Path("s3://stuff/PNR_20220310.csv"))
df = CSV.read(file, DataFrame)

Here I try to use glob:

files = glob("PNR_*.csv", p)
dfs = CSV.read.(files)

With the corresponding error, because glob doesn’t like an S3 path:

LoadError: MethodError: no method matching glob(::String, ::S3Path{AWSConfig})

In R it would be solved by:

library(aws.s3)
library(purrr)
Sys.setenv("AWS_ACCESS_KEY_ID" = "YOUR_ACCESS_KEY_ID",
           "AWS_SECRET_ACCESS_KEY" = "YOUR_ACCESS_KEY",
           "AWS_DEFAULT_REGION" = "YOUR_REGION_LIKE_us-west-2")

csvs <- get_bucket_df("your_bucket", max= Inf) %>% 
filter(str_detect(Key, "sample.*\\.csv$")) 

map_df(csvs$Key, function(key){
  read_csv(get_object(key, bucket = "your_bucket", as = "text"))
})

I could run this with RCall … but it would be nice to stick with a Julia solution.

Doesn’t this work?

DataFrame.(CSV.File.(readdir(p)))

or perhaps with join = true for the readdir?

1 Like

Here is:

DataFrame.(CSV.File.(readdir(p)))
LoadError: ArgumentError: "PNR_20220301.csv" is not a valid file or doesn't exist

So it finds the file but then says it doesn’t exist or valid… which is an odd error message as the files work just fine individually.

Then with the added join argument I get:

dftest = DataFrame.(CSV.File.(readdir(p, join = true)))
LoadError: MethodError: no method matching readavailable(::FileBuffer)

Here is the second error message in full:

LoadError: MethodError: no method matching readavailable(::FileBuffer)
e[0mClosest candidates are:
e[0m  readavailable(e[91m::Base.AbstractPipee[39m) at io.jl:380
e[0m  readavailable(e[91m::Base.GenericIOBuffere[39m) at iobuffer.jl:470
e[0m  readavailable(e[91m::IOStreame[39m) at iostream.jl:379
e[0m  ... 
MethodError: no method matching readavailable(::FileBuffer)
Closest candidates are:
  readavailable(::Base.AbstractPipe) at io.jl:380
  readavailable(::Base.GenericIOBuffer) at iobuffer.jl:470
  readavailable(::IOStream) at iostream.jl:379
  ...

Stacktrace:
  [1] write
    @ ./io.jl:716 [inlined]
  [2] buffer_to_tempfile
    @ ~/.julia/packages/CSV/nofYz/src/utils.jl:312 [inlined]
  [3] getbytebuffer(x::S3Path{AWSConfig}, buffer_in_memory::Bool)
    @ CSV ~/.julia/packages/CSV/nofYz/src/utils.jl:281
  [4] getsource(x::Any, buffer_in_memory::Bool)
    @ CSV ~/.julia/packages/CSV/nofYz/src/utils.jl:289
  [5] CSV.Context(source::CSV.Arg, header::CSV.Arg, normalizenames::CSV.Arg, datarow::CSV.Arg, skipto::CSV.Arg, footerskip::CSV.Arg, transpose::CSV.Arg, comment::CSV.Arg, ignoreemptyrows::CSV.Arg, ignoreemptylines::CSV.Arg, select::CSV.Arg, drop::CSV.Arg, limit::CSV.Arg, buffer_in_memory::CSV.Arg, threaded::CSV.Arg, ntasks::CSV.Arg, tasks::CSV.Arg, rows_to_check::CSV.Arg, lines_to_check::CSV.Arg, missingstrings::CSV.Arg, missingstring::CSV.Arg, delim::CSV.Arg, ignorerepeated::CSV.Arg, quoted::CSV.Arg, quotechar::CSV.Arg, openquotechar::CSV.Arg, closequotechar::CSV.Arg, escapechar::CSV.Arg, dateformat::CSV.Arg, dateformats::CSV.Arg, decimal::CSV.Arg, truestrings::CSV.Arg, falsestrings::CSV.Arg, type::CSV.Arg, types::CSV.Arg, typemap::CSV.Arg, pool::CSV.Arg, downcast::CSV.Arg, lazystrings::CSV.Arg, stringtype::CSV.Arg, strict::CSV.Arg, silencewarnings::CSV.Arg, maxwarnings::CSV.Arg, debug::CSV.Arg, parsingdebug::CSV.Arg, validate::CSV.Arg, streaming::CSV.Arg)
    @ CSV ~/.julia/packages/CSV/nofYz/src/context.jl:324
  [6] #File#25
    @ ~/.julia/packages/CSV/nofYz/src/file.jl:220 [inlined]
  [7] File
    @ ~/.julia/packages/CSV/nofYz/src/file.jl:220 [inlined]
  [8] _broadcast_getindex_evalf
    @ ./broadcast.jl:648 [inlined]
  [9] _broadcast_getindex
    @ ./broadcast.jl:621 [inlined]
 [10] _getindex
    @ ./broadcast.jl:645 [inlined]
 [11] _broadcast_getindex
    @ ./broadcast.jl:620 [inlined]
 [12] getindex
    @ ./broadcast.jl:575 [inlined]
 [13] macro expansion
    @ ./broadcast.jl:984 [inlined]
 [14] macro expansion
    @ ./simdloop.jl:77 [inlined]
 [15] copyto!
    @ ./broadcast.jl:983 [inlined]
 [16] copyto!
    @ ./broadcast.jl:936 [inlined]
 [17] copy
    @ ./broadcast.jl:908 [inlined]
 [18] materialize(bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Nothing, Type{DataFrame}, Tuple{Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Nothing, Type{CSV.File}, Tuple{Vector{S3Path{AWSConfig}}}}}})
    @ Base.Broadcast ./broadcast.jl:883
 [19] top-level scope
    @ In[286]:1
 [20] eval
    @ ./boot.jl:360 [inlined]
 [21] include_string(mapexpr::typeof(REPL.softscope), mod::Module, code::String, filename::String)
    @ Base ./loading.jl:1094

But when you say “it’s fine individually” you’re doing something diffferent, right?

file= AWSS3.read(S3Path("s3://stuff/PNR_20220310.csv"))
df = CSV.read(file, DataFrame)

here you pass the result of AWSS3.read to CSV.read, rather than simply the file path.

I’ve never used AWS3, but it seems to me you want

CSV.read(AWSS3.read.(S3Path.(readdir(p; join = true))), DataFrame)
1 Like

Got it after a slight edit from your code:

dftest = CSV.read(AWSS3.read.(readdir(p; join = true)), DataFrame)

as p was already an S3Path. Thanks!!

1 Like