I have added some additional functionality (append/overwrite) as well as a create table function. To fix the error on the table already existing I had to edit the existing get list of tables (the original didn’t work for me for some reason). I am sure there is a smarter way to do this - but this should at least allow us to have 100 columns that are different formats yet still upload the items.
If we need more complex schema (primary keys etc.) this might need to be done elsewhere…
using Pkg
using GBQ, DataFrames, CSV, Printf, JSON
# assume we have a DataFrame called df that we want to upload to a BigQuery table called my_table in my_dataset
project_id = "xxxx"
dataset_id = "test_models"
table_id = "my_table_v1"
df = DataFrame(a = [1, 2, 3], b = ["x", "y", "z"])
function randstring(length::Int=8)
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
randstring = ""
for i in 1:length
randstring *= alphabet[rand(1:end)]
end
return randstring
end
# Returns a dataframe
function gbq_list_tables(dataset::AbstractString)
response = JSON.parse(read(`bq ls --format=json "$dataset"`, String))
tables = DataFrame(table_id=[table["tableReference"]["tableId"] for table in response],
creation_time=[table["creationTime"] for table in response],
id=[table["id"] for table in response])
return tables
end
function gbq_upload(df::DataFrame, project::AbstractString, dataset::AbstractString, table::AbstractString, overwrite::Bool=false, append::Bool=false)
# write to temporary CSV file
temp_file = joinpath(tempdir(), randstring() * ".csv")
CSV.write(temp_file, df)
# build command to upload CSV to BigQuery
if overwrite
cmd = `bq load --quiet=true --project_id=$project --skip_leading_rows=1 --source_format=CSV --replace=true $dataset.$table $temp_file`
elseif append
cmd = `bq load --quiet=true --project_id=$project --skip_leading_rows=1 --source_format=CSV --replace=true $dataset.$table $temp_file`
else
cmd = `bq load --quiet=true --project_id=$project --skip_leading_rows=1 --source_format=CSV $dataset.$table $temp_file`
end
# run command and capture output
output = read(cmd, String)
# delete temporary file
rm(temp_file)
return output
end
function gbq_create_table(df::DataFrame, project::AbstractString, dataset::AbstractString, table::AbstractString)
# check if table already exists
tables = gbq_get_list_tables(dataset)
if table in tables.table_id
@printf("Table %s.%s.%s already exists\n", project, dataset, table)
return
end
# get column names and types from DataFrame
col_names = names(df)
col_types = [string(t) for t in eltype.(eachcol(df))]
# write DataFrame to CSV file
temp_file = tempname() * ".csv"
CSV.write(temp_file, df)
# build command to upload CSV file to BigQuery
cmd = `bq load --quiet=true --project_id=$project --source_format=CSV --skip_leading_rows=1 $dataset.$table $temp_file $col_names`
# run command and capture output
output = read(cmd, String)
# check for errors
if occursin("Error", output)
@printf("Error creating BigQuery table: %s", output)
else
@printf("Created BigQuery table %s.%s.%s\n", project, dataset, table)
end
# delete temporary file
rm(temp_file)
end
# upload the DataFrame
gbq_create_table(df, project_id, dataset_id, table_id)
gbq_upload(df, project_id, dataset_id, table_id, false, false)
So efficiency, stress testing, and edge cases are needed to be examined still.