HTTP request body -> PostgreSQL

CSV file is being uploaded (multipart/form-data) to my HTTP service. My task is to read content of that file and store it in PostgreSQL table. HTTP request body contains raw data (content of CSV file/files). How to parse & store it in the database table the most efficient way?

julia> using HTTP
julia> req = HTTP.request("GET", "https://support.staffbase.com/hc/en-us/article_attachments/360009197031/username.csv"; verbose=3)
julia> body = req.body
176-element Array{UInt8,1}:
0x55
0x73
0x65
...
julia> println(String(body))
Username; Identifier;First name;Last name
booker12;9012;Rachel;Booker
grey07;2070;Laura;Grey
johnson81;4081;Craig;Johnson
jenkins46;9346;Mary;Jenkins
smith79;5079;Jamie;Smith

Welcome to the community! :smiley:

As far as parsing is concerned, have a look at CSV.jl. For writing to your database, take a look at LibPQ.jl.

3 Likes

You should use bulk insert. It was discussed in this thread: How to quickly bulk insert into postgres - #6 by lbilli

2 Likes

This is also a useful read: Upload of DataFrame data to databases - #5 by mmiller

1 Like

Thank you for you recommendations. Now, I can clarify my issue more in detail.

My CSV file consists of:

1C;2C;3C;4C;5C
1D;2D;3D;4D;5D

and it is sent through multipart/form-data, stored in request body (variable: body):

f = CSV.File(body, header=0, delim=';', skipto = 4, footerskip = 1)
for row in 1:f.rows
    println(f[row])
end

local conn
conn = connect()

copyin = LibPQ.CopyIn("COPY datasets FROM STDIN (FORMAT CSV, HEADER FALSE);", f)
execute(conn, copyin)

Content of CSV file is correct, as you can see values in terminal output. When I executed function “CopyIn” I got error which I don’t understand.

Questions:
1. How should I handle “CSV object” to “CopyIn function” in this case?
2. Is it possible to insert/copy data using sink operator “|>” from CSV file to PostgreSQL using CSV.jl, LibPQ.jl? like in SQLite.jl

Output from command line:

SV.Row:
 :Column1  "1C"
 :Column2  "2C"
 :Column3  "3C"
 :Column4  "4C"
 :Column5  "5C"
CSV.Row:
 :Column1  "1D"
 :Column2  "2D"
 :Column3  "3D"
 :Column4  "4D"
 :Column5  "5D"
┌ Error: error handling request
│   exception =
│    MethodError: no method matching put_copy_data(::LibPQ.Connection, ::CSV.Row)
│    Closest candidates are:
│      put_copy_data(::LibPQ.Connection, !Matched::Union{AbstractString, Array{UInt8,N} where N}) at /root/.julia/packages/LibPQ/LSA5x/src/copy.jl:20
│    Stacktrace:
│     [1] (::LibPQ.var"#77#78"{LibPQ.Connection,LibPQ.CopyIn,Nothing,typeof(error)})() at /root/.julia/packages/LibPQ/LSA5x/src/copy.jl:69
│     [2] lock(::LibPQ.var"#77#78"{LibPQ.Connection,LibPQ.CopyIn,Nothing,typeof(error)}, ::LibPQ.Connection) at /root/.julia/packages/LibPQ/LSA5x/src/connections.jl:337
│     [3] execute(::LibPQ.Connection, ::LibPQ.CopyIn, ::Nothing; throw_error::Bool, kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /root/.julia/packages/LibPQ/LSA5x/src/copy.jl:51
│     [4] execute at /root/.julia/packages/LibPQ/LSA5x/src/copy.jl:45 [inlined] (repeats 2 times)
│     [5] uploadDataset(::Array{UInt8,1}) at /app/src/DbInterface.jl:645
│     [6] uploadDataset at /app/src/Service.jl:128 [inlined]
│     [7] uploadDatasetReq at /app/src/Resource.jl:18 [inlined]
│     [8] handle(::HTTP.Handlers.RequestHandlerFunction{typeof(msDataManagement.Resource.uploadDatasetReq)}, ::HTTP.Messages.Request) at /root/.julia/packages/HTTP/TVRTz/src/Handlers.jl:254
│     [9] handle(::HTTP.Handlers.Router{Symbol("##253")}, ::HTTP.Messages.Request) at /root/.julia/packages/HTTP/TVRTz/src/Handlers.jl:468
│     [10] requestHandler(::HTTP.Messages.Request) at /app/src/Resource.jl:61
│     [11] handle at /root/.julia/packages/HTTP/TVRTz/src/Handlers.jl:254 [inlined]
│     [12] handle(::HTTP.Handlers.RequestHandlerFunction{typeof(msDataManagement.Resource.requestHandler)}, ::HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}) at /root/.julia/packages/HTTP/TVRTz/src/Handlers.jl:277
│     [13] #4 at /root/.julia/packages/HTTP/TVRTz/src/Handlers.jl:346 [inlined]
│     [14] macro expansion at /root/.julia/packages/HTTP/TVRTz/src/Servers.jl:406 [inlined]
│     [15] (::HTTP.Servers.var"#13#14"{HTTP.Handlers.var"#4#5"{HTTP.Handlers.RequestHandlerFunction{typeof(msDataManagement.Resource.requestHandler)}},HTTP.ConnectionPool.Transaction{Sockets.TCPSocket},HTTP.Streams.Stream{HTTP.Messages.Request,HTTP.ConnectionPool.Transaction{Sockets.TCPSocket}}})() at ./task.jl:356
â”” @ HTTP.Servers ~/.julia/packages/HTTP/TVRTz/src/Servers.jl:417

You can use the write_table! function from
https://github.com/lungben/TableIO.jl

It implements the COPY FROM STDIN functionality using CSV.jl and LibPQ.jl.
https://github.com/lungben/TableIO.jl/blob/ce1f205053cee0270ae2b53c0787d576f70e0c4e/src/postgresql.jl#L32

2 Likes

Finally, thank you guys. @lungben, I checked your repositories. It looks very interesting.

I already found solution, what I wanted to do.

f = CSV.File(body, header=0, delim=';', skipto = 4, footerskip = 1)

df = f |> DataFrame

row_strings = map(eachrow(df)) do row
    s = ""
    for c in 1:length(row)
        s *= ismissing(row[c]) ? "," : "$(row[c]),"
    end
    s[1:end-1]*"\n"
end

copyin = LibPQ.CopyIn("COPY datasets FROM STDIN (FORMAT CSV, HEADER FALSE);", row_strings)

local conn
conn = connect()
execute(conn, copyin)

Your solution may have issues with some special characters because no escaping is done. Problematic could especially be , and " as DaraFrame contents.
Using the CSV.jl row iterator CSV.RowWriter(table) has the advantage that escaping is done for you.

1 Like

Good to know that. I’ll check it and improve my solution. Thank you.