Loading Dataframes into Big Query

I’m currently exploring the best way to directly load a dataframe object into Google’s BigQuery, similar to Pandas to_gbq function.

As a hotfix, I’ve been looking at a command line wrapper. This would line up with the processes of GBQ.jl for pulling queries, but ideally I’d like to move this closer to how GCP.jl operates when talking to GBQ.

One issue I’ve noticed with the command line is that it requires the schema to be explicitly fed via the command line which increases the level of complexity. Would be interested to know if anyone else has explored this or is interested in collaborating.

1 Like

Reposting this from GIT:

So here is a potential solution I have been working on which still uses BQ CLI but as you mentioned the schema is an issue with this method:

function randstring(length::Int=8)
    alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
    randstring = ""
    for i in 1:length
        randstring *= alphabet[rand(1:end)]
    end
    return randstring
end

function gbq_upload(df::DataFrame, project::AbstractString, dataset::AbstractString, table::AbstractString)
    # write to temporary CSV file
    temp_file = joinpath(tempdir(), randstring() * ".csv")
    CSV.write(temp_file, df)
    
    # build command to upload CSV to BigQuery
    cmd = `bq load --quiet=true --project_id=$project --skip_leading_rows=1 --source_format=CSV $dataset.$table $temp_file`
    
    # run command and capture output
    output = read(cmd, String)
    
    # delete temporary file
    rm(temp_file)
    
    return output
end

I then tested this out with:

using Pkg

using GBQ, DataFrames, CSV

# assume we have a DataFrame called df that we want to upload to a BigQuery table called my_table in my_dataset
project_id = "xxxx"
dataset_id = "xxxx"
table_id = "xxxx"

df = DataFrame(a = [1, 2, 3], b = ["x", "y", "z"])

gbq_upload(df, project_id, dataset_id, table_id)

This uploads the data correctly to the table. Can you test this and see if it works on your end?

1 Like

This is looking good, I’ll give it a try this week. With the first row is being skipped, I’ll be curious to see how this looks in BQ when loaded. My initial understanding from the documentation is that the column names need to be provided via a text file or string passed at the command line, but it looks like there is an autodetect flag that can be passed.

[S]chema is a valid schema. The schema can be a local JSON file, or it can be typed inline as part of the command. You can also use the –autodetect flag instead of supplying a schema definition

Quoted from here.

Yeah I had to specify the schema manually in BQ which is not ideal.

I’ve been seeing how I can automate that part of the process (the schema map). I often have tables with 100+ variables so manual updates isn’t an option most of the time. Here are the 3 things I’ve been experimenting with:

  1. First load the dataframe to a google cloud storage bucket then run the bq load from the command line. The autodetect schema is well documented when this process is done and this would be the simplest, but requires a push of the data to Google Cloud Storage, which seems like an extra step to me (and not what is needed for the Pandas method, to my knowledge). If we wanted to load with gsutil command the file to GCS, the docs are here. Then could run a version of the bq load on the GCS file. This might be the fastest hotfix.

  2. Pass the column names of the dataframe with names() to a list and then format that to a string which can be passed as a parameter. The data type needs to be scraped and then formatted to BQ requirements which is also a bit of work. Cleaning it and formatting it is going to be a little tricky. Here is how I have been scrapping the datatypes for my dataframe df:

col_type = []

for T in (eltype.(eachcol(df)))
    col_type = vcat(col_type, String(nameof(T)))
end
  1. Create a schema JSON file that can be pointed to when passing the command line for bq load. Google documents this fairly well. Similar issues as noted with 2 above but there look to be some libraries that can export a JSON file of a dataframe structure (tbd if that gives us what were looking for).

Let me know if anything else jumps out or if there is anything simpler.

I have added some additional functionality (append/overwrite) as well as a create table function. To fix the error on the table already existing I had to edit the existing get list of tables (the original didn’t work for me for some reason). I am sure there is a smarter way to do this - but this should at least allow us to have 100 columns that are different formats yet still upload the items.

If we need more complex schema (primary keys etc.) this might need to be done elsewhere…

using Pkg

using GBQ, DataFrames, CSV, Printf, JSON

# assume we have a DataFrame called df that we want to upload to a BigQuery table called my_table in my_dataset
project_id = "xxxx"
dataset_id = "test_models"
table_id = "my_table_v1"

df = DataFrame(a = [1, 2, 3], b = ["x", "y", "z"])

function randstring(length::Int=8)
    alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
    randstring = ""
    for i in 1:length
        randstring *= alphabet[rand(1:end)]
    end
    return randstring
end

# Returns a dataframe
function gbq_list_tables(dataset::AbstractString)
    response = JSON.parse(read(`bq ls --format=json "$dataset"`, String))
    tables = DataFrame(table_id=[table["tableReference"]["tableId"] for table in response],
                       creation_time=[table["creationTime"] for table in response],
                       id=[table["id"] for table in response])
    return tables
  end

function gbq_upload(df::DataFrame, project::AbstractString, dataset::AbstractString, table::AbstractString, overwrite::Bool=false, append::Bool=false)
    # write to temporary CSV file
    temp_file = joinpath(tempdir(), randstring() * ".csv")
    CSV.write(temp_file, df)
    
    # build command to upload CSV to BigQuery
    if overwrite
        cmd = `bq load --quiet=true --project_id=$project --skip_leading_rows=1 --source_format=CSV --replace=true $dataset.$table $temp_file`
    elseif append
        cmd = `bq load --quiet=true --project_id=$project --skip_leading_rows=1 --source_format=CSV --replace=true $dataset.$table $temp_file`
    else
        cmd = `bq load --quiet=true --project_id=$project --skip_leading_rows=1 --source_format=CSV $dataset.$table $temp_file`
    end
    
    # run command and capture output
    output = read(cmd, String)
    
    # delete temporary file
    rm(temp_file)
    
    return output
end

function gbq_create_table(df::DataFrame, project::AbstractString, dataset::AbstractString, table::AbstractString)
    # check if table already exists
    tables = gbq_get_list_tables(dataset)
    if table in tables.table_id
        @printf("Table %s.%s.%s already exists\n", project, dataset, table)
        return
    end
    
    # get column names and types from DataFrame
    col_names = names(df)
    col_types = [string(t) for t in eltype.(eachcol(df))]
    
    # write DataFrame to CSV file
    temp_file = tempname() * ".csv"
    CSV.write(temp_file, df)
    
    # build command to upload CSV file to BigQuery
    cmd = `bq load --quiet=true --project_id=$project --source_format=CSV --skip_leading_rows=1 $dataset.$table $temp_file $col_names`
    
    # run command and capture output
    output = read(cmd, String)
    
    # check for errors
    if occursin("Error", output)
        @printf("Error creating BigQuery table: %s", output)
    else
        @printf("Created BigQuery table %s.%s.%s\n", project, dataset, table)
    end

    # delete temporary file
    rm(temp_file)

end

# upload the DataFrame
gbq_create_table(df, project_id, dataset_id, table_id)
gbq_upload(df, project_id, dataset_id, table_id, false, false)

So efficiency, stress testing, and edge cases are needed to be examined still.

A possible alternative but roundabout way would be to use DuckDB.jl to leverage DuckDB’s capabilities to import & export to S3 and Google Cloud Storage?

2 Likes

Could you provide a quick example of using DuckDB? All I see is a brief use case of making a local in-memory DB environment for Julia.

using DuckDB, DataFrames

# create a new in-memory database
con = DBInterface.connect(DuckDB.DB, ":memory:")

# create a DataFrame
df = DataFrame(a = [1, 2, 3], b = [42, 84, 42])

# register it as a view in the database
DuckDB.register_data_frame(con, df, "my_df")

DBInterface.execute(con, "CREATE TABLE my_table AS SELECT * FROM my_df")

At this point, you have a DuckDB instance running and your data is in a table named my_table. Now you want to try to send it to GCS. I don’t actually know how, but I would try this:

DBInterface.execute(con,"SET s3_endpoint='storage.googleapis.com';")
DBInterface.execute(con,"SET s3_access_key_id='key_id';")
DBInterface.execute(con,"SET s3_secret_access_key='access_key';")
DBInterface.execute(con,"COPY my_table TO 's3://gcs_bucket/file.parquet';")

Presumably setting the secrets and whatnot only needs to happen once per session.

@Billpete002 I’ve been able to successfully load the schema with the import by adding the bq load --autodetect let me know if that works on your end. Originally, it wasn’t clear to me that this would work from the documentation when attempting from the command line with a local file, but I’ve been able to load both a JSON and a CSV.

I’m also thinking that it might be better to switch the CSV to JSON since GBQ already imports JSON, whereas adding CSV for the function would require the import of a new module to precompile. Mostly to keep the GBQ library light weight, since this is a back-end function.

Boiler plate of what I used at the command line:

bq load --autodetect --quiet=true --project_id=projectid --source_format=CSV --replace=true dataset.destination_table csvfile.csv

bq load --autodetect --quiet=true --project_id=projectid --source_format=NEWLINE_DELIMITED_JSON --replace=true dataset.destination_table jsonfile.json

How did you convert a dataframe to a .json easily without importing JSONTables?

function gbq_list_tables(dataset::AbstractString)
    response = JSON.parse(read(`bq ls --format=json "$dataset"`, String))
    tables = DataFrame(table_id=[table["tableReference"]["tableId"] for table in response],
                       creation_time=[table["creationTime"] for table in response],
                       id=[table["id"] for table in response])
    return tables
  end


function gbq_create_table(df::DataFrame, project::AbstractString, dataset::AbstractString, table::AbstractString)
    # check if table already exists
    tables = gbq_list_tables(dataset)
    if table in tables.table_id
        @printf("Table %s.%s.%s already exists\n", project, dataset, table)
        return
    end
    
    # get column names and types from DataFrame
    col_names = names(df)
    col_types = [string(t) for t in eltype.(eachcol(df))]
    
    # write DataFrame to JSON file
    temp_file = tempname() * ".json"
    JSON.json(temp_file, df)
    
    # build command to upload JSON file to BigQuery
    cmd = `bq load --quiet=true --project_id=$project --source_format=NEWLINE_DELIMITED_JSON --skip_leading_rows=1 $dataset.$table $temp_file $col_names`
    
    # run command and capture output
    output = read(cmd, String)
    
    # check for errors
    if occursin("Error", output)
        @printf("Error creating BigQuery table: %s", output)
    else
        @printf("Created BigQuery table %s.%s.%s\n", project, dataset, table)
    end

    # delete temporary file
    rm(temp_file)

end

This spits the error

ERROR: MethodError: no method matching write(::IOStream, ::DataFrame)

Though I agree with keeping it light weight :+1:

I just had the following in my test script to generate the JSON that I uploaded with the above command:

test_pull = gbq_query(test_query, convert_numeric=true)

open("/[filepath]/jsonfile.json", "w") do f
    JSON.print(f, test_pull)
end

Then I used this at the command line to load that file:

bq load --autodetect --quiet=true --project_id=projectid --source_format=NEWLINE_DELIMITED_JSON --replace=true dataset.destination_table jsonfile.json
1 Like

I wonder if there is a way for DuckDB to get connection to BigQuery but this would make it easy to make it into a parquet file for transfer :thinking:

Thanks - I’ll test this out and see how it works.

Works. I removed the schema function as with autodetect that basically makes it moot. The user has essentially 2 options overwrite OR append - if they choose false false it will append and true true will overwrite. I removed CSV and any other package to keep this with just DataFrames and JSON. I’ll upload these changes to the package - also there is the (now working gbq_list_tables() as I noticed this was not working in it’s current form so you should be able to see all the tables within a dataset:

using Pkg

using GBQ, DataFrames, JSON

# assume we have a DataFrame called df that we want to upload to a BigQuery table called my_table in my_dataset
project_id = "xxx"
dataset_id = "test_models"
table_id = "my_table_v2"

df = DataFrame(a = [1, 2, 3], b = ["x", "y", "z"])

# Returns a dataframe
function gbq_list_tables(dataset::AbstractString)
    response = JSON.parse(read(`bq ls --format=json "$dataset"`, String))
    tables = DataFrame(table_id=[table["tableReference"]["tableId"] for table in response],
                       creation_time=[table["creationTime"] for table in response],
                       id=[table["id"] for table in response])
    return tables
  end

function gbq_upload(df::DataFrame, project::AbstractString, dataset::AbstractString, table::AbstractString, overwrite::Bool=false, append::Bool=false)
    # write to temporary JSON file
    temp_file = joinpath(tempdir(), "temp.json")
    open(temp_file,"w") do f
        JSON.print(f, df)
    end
    
    # build command to upload JSON to BigQuery
    if overwrite
        cmd = `bq load --autodetect --quiet=true --project_id=$project --source_format=NEWLINE_DELIMITED_JSON --replace=true $dataset.$table $temp_file`
    else append
        cmd = `bq load --autodetect --quiet=true --project_id=$project --source_format=NEWLINE_DELIMITED_JSON --replace=false $dataset.$table $temp_file`
    end
    
    # run command and capture output
    output = read(cmd, String)
    
    # delete temporary file
    rm(temp_file)
    
    return output
end

# upload the DataFrame
gbq_upload(df, project_id, dataset_id, table_id, true, false)

1 Like

This is excellent, I’ve been traveling and this is great to come back to and see. I’m linking to your merged code here for other’s reference:

I may update the docs in the repo a bit to reflect some of our changes, and since they’re sparse.

Sounds great - I put a post in “issues” following some text I saw about registering libraries for Julia - it would be great to just have pkg.add(GBQ) rather than specifying the git location - but nothing has happened so I am unsure how we go about registering the library.

I think there might be an automated process and some predefined requirements on the main branch needed. Taking that from this thread: Problem registering package in general registry

But, haven’t tried to register before and I wonder if we’d need to pull in Martin since he is the owner of the repo (I assume we could just open a PR and he’d merge any changes then we could submit to JuliaHub for auto inclusion)