Batch insertion using MySQL.jl

I am trying to change some functions that insert into my MariaDB database so that they will work in batches instead of row-by-row. I am having somewhat of a hard time because I don’t really understand how to prepare statements correctly with MySQL.jl.

Here are my current functions that only do it row-by-row:

import MySQL
import DataFrames: AbstractDataFrame, eachrow, names
import Iterators: partition

function database_insertstatement(
    connection::MySQL.DBInterface.Connection,
    tablename::AbstractString,
    columns::AbstractVector{<:AbstractString};
    primarykeys::AbstractVector{<:AbstractString}=[""]
    )
    fields = "(" * join(columns, ",") * ")"
    values = "(" * join(map(x -> "?", columns), ",") * ")"
    value_update = join(map(x -> x * " = VALUE(" * x * ")", setdiff(columns, primarykeys)), ",")

    sql = (
        "INSERT INTO " * tablename * " " * fields * " VALUES " * values
        * " ON DUPLICATE KEY UPDATE " * value_update * ";"
    )

    return MySQL.DBInterface.prepare(connection, sql)
end


function database_insertintotable(
    connection::MySQL.DBInterface.Connection,
    tablename::AbstractString,
    datatoinsert::AbstractDataFrame;
    primarykeys::AbstractVector{<:AbstractString}=[""]
    )
    statement = database_insertstatement(
        connection, tablename, names(datatoinsert);
        primarykeys=primarykeys
    )

    MySQL.transaction(connection) do
        for row in eachrow(datatoinsert)
            MySQL.DBInterface.execute(statement, row)
        end
    end
    MySQL.DBInterface.close!(statement)
    return nothing
end

First, I tried to modify database_insertstatement to allow a batch::Bool keyword argument to allow the statement to receive more parameters for the ‘VALUES (?, ?, ?, …)’ segment of the statement to become ‘VALUES (?, ?, ?, …), (?, ?, ?, ?), …’ instead, but I failed to do this.

Then, I tried ignoring the first function and just resort to defining the statement to execute for every batch to be executed, but then I get string formatting errors. Initially I though it was because I was passing a string object, but I tried building a statement with MySQL.jl’s prepare function and I got the same error. Here is the code for the attempt:

function database_insertintotable(
    connection::MySQL.DBInterface.Connection,
    tablename::AbstractString,
    datatoinsert::AbstractDataFrame;
    primarykeys::AbstractVector{<:AbstractString}=[""],
    batch_size::Int=1000
    )    
    rows = collect(eachrow(datatoinsert))
    table_columns = names(datatoinsert)
    fields = "(" * join(table_columns, ",") * ")"
    value_update = join(map(x -> x * " = VALUE(" * x * ")", setdiff(table_columns, primarykeys)), ",")

    MySQL.transaction(connection) do
        for batch in partition(rows, batch_size)
            values_list = join(["(" * join(row, ", ") * ")" for row in batch], ", ")

            sql = (
                "INSERT INTO " * tablename * " " * fields * " VALUES " * values_list
                * " ON DUPLICATE KEY UPDATE " * value_update * ";"
            )

            statement = MySQL.DBInterface.prepare(connection, sql)
            MySQL.DBInterface.execute(statement)
            MySQL.DBInterface.close!(statement)
        end
    end
    return nothing
end

The error I’m getting is the following:

ERROR: (1064): You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'Gas, 1751.0, PT), (2021-11-04, 1, Fossil Hard coal, 282.0, PT), (2021-11-04, ...' at line 1

Is there a simpler way of achieving this?

I think I got it working using the first methodology of preparing the statement with all the ? parameters. I got some inspiration from the code at https://github.com/JuliaDatabases/MySQL.jl/issues/129#issue-490171561.

function database_insertstatement(
    connection::MySQL.DBInterface.Connection,
    tablename::AbstractString,
    columns::AbstractVector{<:AbstractString};
    primarykeys::AbstractVector{<:AbstractString}=[""],
    batch::Bool=false,
    batch_size::Int=1000
    )
    fields = "(" * join(columns, ",") * ")"
    value_row = "(" * join(map(x -> "?", columns), ",") * ")"
    value_update = join(map(x -> x * " = VALUE(" * x * ")", setdiff(columns, primarykeys)), ",")

    if batch
        values = join([value_row for x = 1:batch_size], ", ")
    else
        values = value_row
    end

    sql = (
        "INSERT INTO " * tablename * " " * fields * " VALUES " * values
        * " ON DUPLICATE KEY UPDATE " * value_update * ";"
    )
    
    return MySQL.DBInterface.prepare(connection, sql)
end


function database_insertintotable(
    connection::MySQL.DBInterface.Connection,
    tablename::AbstractString,
    datatoinsert::AbstractDataFrame;
    primarykeys::AbstractVector{<:AbstractString}=[""],
    batch::Bool=false,
    batch_size::Int=1000
    )
    if batch
        batch_statement = database_insertstatement(
            connection, tablename, names(datatoinsert);
            primarykeys=primarykeys, batch=batch, batch_size=batch_size
        )
        rows = collect(eachrow(datatoinsert))

        MySQL.transaction(connection) do
            MySQL.DBInterface.execute(connection, "SET autocommit=0")
            for iter_batch in partition(rows, batch_size)          
                iter_values = iter_batch |> x -> Vector.(x) |> x -> vcat(x...)
                if length(iter_batch) == batch_size
                    MySQL.DBInterface.execute(batch_statement, iter_values)
                else
                    special_batch_statement = database_insertstatement(
                        connection, tablename, names(finaldata);
                        primarykeys=primarykeys, batch=batch, batch_size=length(iter_batch)
                    )
                    MySQL.DBInterface.execute(special_batch_statement, iter_values)
                    MySQL.DBInterface.close!(special_batch_statement)
                end
                sleep(0.001)
            end
            MySQL.DBInterface.execute(connection, "COMMIT;")
        end

        MySQL.DBInterface.close!(batch_statement)
    else
        statement = database_insertstatement(
            connection, tablename, names(datatoinsert);
            primarykeys=primarykeys
        )

        MySQL.transaction(connection) do
            for row in eachrow(datatoinsert)
                MySQL.DBInterface.execute(statement, row)
            end
        end
        MySQL.DBInterface.close!(statement)
    end
    return nothing
end