diff --git a/performance_testing/jmeter/README.md b/performance_testing/jmeter/README.md
new file mode 100644
index 000000000..a7f855fdf
--- /dev/null
+++ b/performance_testing/jmeter/README.md
@@ -0,0 +1,128 @@
+# Using JMeter for BigQuery Performance Testing
+
+## Before You Start
+
+Make sure you've completed the following prerequisite steps before running the
+provided JMeter test plans
+
+* Install
+ [Java 8+ Oracle JDK](https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html)
+ from Oracle page
+* Download the
+ [Simba BigQuery JDBC Driver](https://cloud.google.com/bigquery/providers/simba-drivers)
+* Download the latest
+ [JMeter Binary](https://jmeter.apache.org/download_jmeter.cgi)
+
+## Which JMeter Test Plan Do I Use?
+
+### [bigquery_jdbc_sampler.jmx](bigquery_jdbc_sampler.jmx) (Runs queries using JDBC driver)
+
+#### Pros
+
+* **Long-running job polling** - The JDBC request sampler is necessary for
+ tests where queries run longer than 4 minutes and where a consistent
+ concurrency level must be maintained. The JDBC driver will poll the query
+ job until it is finished before submitting a new query, ensuring that JMeter
+ active threads exactly match active BigQuery query jobs.
+* **Simpler query format** - The JDBC request sampler does not require you to
+ form a JSON configuration object to submit the query to the API. This
+ eliminates JSON errors as a source of problems.
+ * Unescaped double quotes are allowed in SQL queries - You do not have to
+ escape double quotes in your SQL queries as is required in the HTTP
+ sampler.
+
+#### Cons
+
+* **JDBC overhead latency** - The JDBC driver has some overhead latency
+ associated with it versus directly calling the REST API. Use the
+ BigQuery-provided
+ [INFORMATION_SCHEMA.JOBS_BY*](https://cloud.google.com/bigquery/docs/information-schema-jobs)
+ view to exclusively measure query runtime without any other latencies like
+ network.
+* **BigQuery job labels unsupported** - You cannot currently set labels for
+ jobs submitted by the JDBC driver. In order to get a similar effect to
+ labeling, you'll need to include something like a JSON object in a comment
+ in each query, that can be parsed when querying the
+ [INFORMATION_SCHEMA.JOBS_BY*](https://cloud.google.com/bigquery/docs/information-schema-jobs)
+ view.
+* **Response rows must be returned** - The JDBC driver does not support an
+ option to return 0 results. The MaxResults JDBC config should therefore be
+ set to 1, since the default setting of 0 instructs the JDBC driver to return
+ all rows.
+
+### [bigquery_http_sampler.jmx](bigquery_http_sampler.jmx) (Runs queries using REST API)
+
+#### Pros
+
+* **Fully configurable job options, including job labels** - The HTTP request
+ sampler allows you to specify the raw JSON request body which can include
+ any supported BigQuery options. In particular, it's very useful to include
+ query labels, since these will be present in the
+ [jobs metadata schema](https://cloud.google.com/bigquery/docs/information-schema-jobs#schema)
+ in the labels field.
+* **Faster Performance** - Since JMeter is making REST calls directly to the
+ BigQuery API, the performance is faster than having to invoke BigQuery API
+ via the Java JDBC driver.
+
+#### Cons
+
+* **Default 1 hour maximum lifetime for access tokens** - The HTTP request
+ sampler uses an access token (which you provide as a command-line parameter
+ at startup) to authenticate with BigQuery. The default maximum lifetime of a
+ Google access token is 1 hour (3,600 seconds). However, you can extend the
+ maximum lifetime to 12 hours by
+ [modifying the organization policy](https://cloud.google.com/resource-manager/docs/organization-policy/restricting-service-accounts#extend_oauth_ttl).
+ JMeter calls to BigQuery APIs will start failing if your JMeter test runs
+ longer than your access token’s maximum lifetime.
+* **JSON body configuration** - You need to configure the API request payload
+ using JSON, and the JSON object configuration is easy to break. A stray
+ quote or a missing comma can make your query fail in ways that are hard to
+ troubleshoot.
+ * **Queries must have all double quotes escaped** - Since the SQL queries
+ you pass to JMeter are values inside the HTTP request JSON body, you
+ must escape all double quotes that appear in the SQL query with a
+ backslash. ( e.g. SELECT \”Hello World\” )
+* **4min Max Timeout** - If a query runs for longer than 4 minutes, it can
+ appear to be done. If you intend to use JMeter's data to characterize the
+ runtime of your queries, this is a critical consideration. The results will
+ be wrong if you have queries that are long-running.
+
+## Running the JMeter Test Plan
+
+The JMeter test plans provided in this repo are designed to be run with very few
+modifications. You should first test-run them this way before adding in more
+changes to simplify troubleshooting if any issues are encountered.
+
+### [run_jmeter_jdbc_sampler.sh](run_jmeter_jdbc_sampler.sh) (**Runs bigquery_jdbc_sampler.jmx**)
+
+1. Replace the bash script placeholders with your own values, depending on
+ whether you use JDBC or HTTP as shown below:
+ * `-Jproject_id=`*YOUR_PROJECT*
+ * `-Juser.classpath=`*/path/to/your/SimbaJDBCDriverforGoogleBigQuery*
+1. Ensure proper authentication is set up for either service account or user
+ account authentication:
+ * Service account authentication: \
+ `export GOOGLE_APPLICATION_CREDENTIALS=`*/path/to/your/private_key.json*
+ * User account authentication: \
+ `gcloud auth application-default login`
+1. Run the bash helper script to begin the JMeter test
+ * `bash run_jmeter_jdbc_sampler.sh`
+
+### [run_jmeter_http_sampler.sh](run_jmeter_http_sampler.sh) (**Runs bigquery_http_sampler.jmx**)
+
+1. Replace the bash script placeholders shown below with your own values:
+ * `-Jproject_id=`*YOUR_PROJECT*
+1. Ensure proper authentication is set up
+ * Service account authentication: \
+ `gcloud auth activate-service-account
+ --key-file=`*/path/to/your/private_key.json*
+ * User account authentication: \
+ `gcloud auth login`
+1. Run the bash helper script to begin the JMeter test
+ * `bash run_jmeter_http_sampler.sh`
+
+## Inspecting the JMeter Test Plans
+
+The best method of viewing and understand the JMeter test plans is to open then in JMeter's GUI mode as shown below:
+* `./apache-jmeter-5.3/bin/jmeter -t bigquery_jdbc_sampler.jmx`
+* `./apache-jmeter-5.3/bin/jmeter -t bigquery_http_sampler.jmx`
diff --git a/performance_testing/jmeter/bigquery_http_sampler.jmx b/performance_testing/jmeter/bigquery_http_sampler.jmx
new file mode 100644
index 000000000..31bfca048
--- /dev/null
+++ b/performance_testing/jmeter/bigquery_http_sampler.jmx
@@ -0,0 +1,338 @@
+
+
+
+
+
+ false
+ false
+
+
+
+
+
+
+
+
+
+ Authorization
+ Bearer ${__P(token)}
+
+
+ Content-Type
+ application/json
+
+
+
+
+
+ continue
+
+ false
+ ${__P(num_loops,0)}
+
+ ${__P(simple_num_users,1)}
+ ${__P(ramp_time)}
+ 1504864705000
+ 1504864705000
+ true
+ ${__P(thread_duration,3600)}
+
+ true
+
+
+
+ true
+
+
+
+ false
+ {
+ "kind": "bigquery#QueryRequest",
+ "useQueryCache": false,
+ "useLegacySql": false,
+ "timeoutMs":21600000,
+ "query": "${simple_query}",
+ "labels": {"jmeter_id": "${simple_id}", "run_id": "${__P(run_id)}"},
+ "maxResults": 1
+}
+
+
+ =
+
+
+
+ bigquery.googleapis.com
+
+ https
+
+ /bigquery/v2/projects/${__P(project_id)}/queries
+ POST
+ true
+ false
+ true
+ false
+
+
+ 21600000
+
+
+
+ true
+
+ saveConfig
+
+
+ true
+ true
+ true
+
+ true
+ true
+ true
+ true
+ false
+ true
+ true
+ false
+ false
+ false
+ true
+ false
+ false
+ false
+ true
+ 0
+ true
+ true
+ true
+ true
+ true
+ true
+
+
+ ${__P(error_csv_path)}
+
+
+
+ \t
+
+ ${__P(simple_csv_path)}
+ false
+ false
+ true
+ shareMode.all
+ false
+
+
+
+
+
+
+ continue
+
+ false
+ ${__P(num_loops,0)}
+
+ ${__P(medium_num_users,1)}
+ ${__P(ramp_time)}
+ 1504864705000
+ 1504864705000
+ true
+ ${__P(thread_duration,3600)}
+
+ true
+
+
+
+ true
+
+
+
+ false
+ {
+ "kind": "bigquery#QueryRequest",
+ "useQueryCache": false,
+ "useLegacySql": false,
+ "timeoutMs":21600000,
+ "query": "${medium_query}",
+ "labels": {"jmeter_id": "${medium_id}", "run_id": "${__P(run_id)}"},
+ "maxResults": 1
+}
+
+
+ =
+
+
+
+ bigquery.googleapis.com
+
+ https
+
+ /bigquery/v2/projects/${__P(project_id)}/queries
+ POST
+ true
+ false
+ true
+ false
+
+
+ 21600000
+
+
+
+ true
+
+ saveConfig
+
+
+ true
+ true
+ true
+
+ true
+ true
+ true
+ true
+ false
+ true
+ true
+ false
+ false
+ false
+ true
+ false
+ false
+ false
+ true
+ 0
+ true
+ true
+ true
+ true
+ true
+ true
+
+
+ ${__P(error_csv_path)}
+
+
+
+ \t
+
+ ${__P(medium_csv_path)}
+ false
+ false
+ true
+ shareMode.all
+ false
+
+
+
+
+
+
+ continue
+
+ false
+ ${__P(num_loops,0)}
+
+ ${__P(complex_num_users,1)}
+ ${__P(ramp_time)}
+ 1504864705000
+ 1504864705000
+ true
+ ${__P(thread_duration,3600)}
+
+ true
+
+
+
+ true
+
+
+
+ false
+ {
+ "kind": "bigquery#QueryRequest",
+ "useQueryCache": false,
+ "useLegacySql": false,
+ "timeoutMs":21600000,
+ "query": "${complex_query}",
+ "labels": {"jmeter_id": "${complex_id}", "run_id": "${__P(run_id)}"},
+ "maxResults": 1
+}
+
+
+ =
+
+
+
+ bigquery.googleapis.com
+
+ https
+
+ /bigquery/v2/projects/${__P(project_id)}/queries
+ POST
+ true
+ false
+ true
+ false
+
+
+ 21600000
+
+
+
+ true
+
+ saveConfig
+
+
+ true
+ true
+ true
+
+ true
+ true
+ true
+ true
+ false
+ true
+ true
+ false
+ false
+ false
+ true
+ false
+ false
+ false
+ true
+ 0
+ true
+ true
+ true
+ true
+ true
+ true
+
+
+ ${__P(error_csv_path)}
+
+
+
+ \t
+
+ ${__P(complex_csv_path)}
+ false
+ false
+ true
+ shareMode.all
+ false
+
+
+
+
+
+
+
+
diff --git a/performance_testing/jmeter/bigquery_jdbc_sampler.jmx b/performance_testing/jmeter/bigquery_jdbc_sampler.jmx
new file mode 100644
index 000000000..0b0e9887b
--- /dev/null
+++ b/performance_testing/jmeter/bigquery_jdbc_sampler.jmx
@@ -0,0 +1,269 @@
+
+
+
+
+
+ false
+ false
+
+
+
+
+
+
+
+ true
+
+ 5000
+
+ bq_pool
+ jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;OAuthType=3;ProjectId=${__P(project_id)};Timeout=3600;useQueryCache=0;MaxResults=1;
+ com.simba.googlebigquery.jdbc42.Driver
+
+ true
+
+ 0
+ false
+ 10000
+ DEFAULT
+ 60000
+
+
+
+
+ continue
+
+ false
+ ${__P(num_loops,0)}
+
+ ${__P(simple_num_users,1)}
+ ${__P(ramp_time)}
+ true
+ ${__P(thread_duration,3600)}
+
+ true
+
+
+
+ bq_pool
+ /*${__P(run_id)},${simple_id}*/ ${simple_query}
+
+
+ -1
+ Select Statement
+ Store as String
+ 0
+
+
+
+
+
+ ${__P(simple_csv_path)}
+
+
+ false
+ \t
+ false
+ true
+ false
+ shareMode.all
+
+
+
+ true
+
+ saveConfig
+
+
+ true
+ true
+ true
+
+ true
+ true
+ true
+ true
+ false
+ true
+ true
+ false
+ false
+ false
+ true
+ false
+ false
+ false
+ true
+ 0
+ true
+ true
+ true
+ true
+ true
+ true
+
+
+ ${__P(error_csv_path)}
+
+
+
+
+
+ continue
+
+ false
+ ${__P(num_loops,0)}
+
+ ${__P(medium_num_users,1)}
+ ${__P(ramp_time)}
+ true
+ ${__P(thread_duration,3600)}
+
+ true
+
+
+
+ bq_pool
+ /*${__P(run_id)},${medium_id}*/ ${medium_query}
+
+
+ -1
+ Select Statement
+ Store as String
+ 0
+
+
+
+
+
+ ${__P(medium_csv_path)}
+
+
+ false
+ \t
+ false
+ true
+ false
+ shareMode.group
+
+
+
+ true
+
+ saveConfig
+
+
+ true
+ true
+ true
+
+ true
+ true
+ true
+ true
+ false
+ true
+ true
+ false
+ false
+ false
+ true
+ false
+ false
+ false
+ true
+ 0
+ true
+ true
+ true
+ true
+ true
+ true
+
+
+ ${__P(error_csv_path)}
+
+
+
+
+
+ continue
+
+ false
+ ${__P(num_loops,0)}
+
+ ${__P(complex_num_users,1)}
+ ${__P(ramp_time)}
+ true
+ ${__P(thread_duration,3600)}
+
+ true
+
+
+
+ bq_pool
+ /*${__P(run_id)},${complex_id}*/ ${complex_query}
+
+
+ -1
+ Select Statement
+ Store as String
+ 0
+
+
+
+
+
+ ${__P(complex_csv_path)}
+
+
+ false
+ \t
+ false
+ true
+ false
+ shareMode.group
+
+
+
+ true
+
+ saveConfig
+
+
+ true
+ true
+ true
+
+ true
+ true
+ true
+ true
+ false
+ true
+ true
+ false
+ false
+ false
+ true
+ false
+ false
+ false
+ true
+ 0
+ true
+ true
+ true
+ true
+ true
+ true
+
+
+ ${__P(error_csv_path)}
+
+
+
+
+
+
+
diff --git a/performance_testing/jmeter/cancel_running_jobs.py b/performance_testing/jmeter/cancel_running_jobs.py
new file mode 100644
index 000000000..e645fd642
--- /dev/null
+++ b/performance_testing/jmeter/cancel_running_jobs.py
@@ -0,0 +1,38 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from argparse import ArgumentParser
+from google.cloud import bigquery
+
+
+def cancel_jobs(client):
+ for job in client.list_jobs(all_users=True, state_filter="RUNNING"):
+ client.cancel_job(job.job_id, location='us')
+
+
+def get_cmd_line_args():
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--project_id',
+ help='Project in which all running BigQuery jobs will be cancelled.')
+ return parser.parse_args()
+
+
+def main():
+ args = get_cmd_line_args()
+ cancel_jobs(bigquery.Client(project=args.project_id))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/performance_testing/jmeter/http_sampler_results.sql b/performance_testing/jmeter/http_sampler_results.sql
new file mode 100644
index 000000000..fd228272f
--- /dev/null
+++ b/performance_testing/jmeter/http_sampler_results.sql
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SELECT
+ SPLIT(labels[OFFSET(1)].value, '_')[OFFSET(0)] AS complexity,
+ COUNT(1)
+FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
+WHERE
+ DATE(creation_time) = CURRENT_DATE() -- Partitioning column
+ AND project_id = 'YOUR_PROJECT' -- Clustering column
+ AND ARRAY_LENGTH(labels) > 0
+ AND EXISTS (
+ SELECT *
+ FROM UNNEST(labels) AS labels
+ WHERE
+ labels.key = 'run_id'
+ AND labels.value = 'jmeter_http_test'
+ )
+GROUP BY 1
diff --git a/performance_testing/jmeter/jdbc_sampler_results.sql b/performance_testing/jmeter/jdbc_sampler_results.sql
new file mode 100644
index 000000000..5889f7b66
--- /dev/null
+++ b/performance_testing/jmeter/jdbc_sampler_results.sql
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SELECT
+-- SPLIT(TRIM(SPLIT(query, '*/')[OFFSET(0)],'/*'))[OFFSET(1)] AS query_id,
+ SPLIT(SPLIT(TRIM(SPLIT(query, '*/')[OFFSET(0)],'/*'))[OFFSET(1)], '_')[OFFSET(0)] AS complexity,
+ COUNT(1)
+FROM
+ `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
+WHERE
+ DATE(creation_time) = CURRENT_DATE() -- Partitioning column
+ AND project_id = 'YOUR_PROJECT' -- Clustering column
+ AND SPLIT(TRIM(SPLIT(query, '*/')[OFFSET(0)],'/*'))[OFFSET(0)] = 'jmeter_jdbc_test'
+GROUP BY 1
diff --git a/performance_testing/jmeter/run_jmeter_http_sampler.sh b/performance_testing/jmeter/run_jmeter_http_sampler.sh
new file mode 100755
index 000000000..30368255c
--- /dev/null
+++ b/performance_testing/jmeter/run_jmeter_http_sampler.sh
@@ -0,0 +1,44 @@
+#!/usr/bin/env bash
+
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+#########################################################################
+# Make sure you run the following gcloud auth command
+# if you're not using a service account to authenticate:
+#
+# gcloud auth login
+#
+# If you are using a service account, run the following gcloud auth command
+# after specifying the path to your service account private key.
+#
+# gcloud auth activate-service-account --key-file=/path/to/your/private_key.json
+#
+#########################################################################
+
+apache-jmeter-5.3/bin/jmeter -n \
+-t bigquery_http_sampler.jmx \
+-Jproject_id=YOUR_PROJECT \
+-Jtoken=$(gcloud auth print-access-token) \
+-Jsimple_csv_path=test_queries/simple_selects.csv \
+-Jmedium_csv_path=test_queries/medium_selects.csv \
+-Jcomplex_csv_path=test_queries/complex_selects.csv \
+-Jerror_csv_path=errors.csv \
+-Jsimple_num_users=6 \
+-Jmedium_num_users=3 \
+-Jcomplex_num_users=1 \
+-Jnum_loops=-1 \
+-Jrun_id=jmeter_http_test \
+-Jthread_duration=10 \
+-Jramp_time=0;
diff --git a/performance_testing/jmeter/run_jmeter_jdbc_sampler.sh b/performance_testing/jmeter/run_jmeter_jdbc_sampler.sh
new file mode 100755
index 000000000..95763442c
--- /dev/null
+++ b/performance_testing/jmeter/run_jmeter_jdbc_sampler.sh
@@ -0,0 +1,44 @@
+#!/usr/bin/env bash
+
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+#########################################################################
+# Make sure you run the following gcloud auth command
+# if you're not using a service account to authenticate:
+#
+# gcloud auth application-default login
+#
+# If you are using a service account, uncomment the export command below
+# and specify the path to your service account private key.
+#
+# export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/private_key.json
+#
+#########################################################################
+
+apache-jmeter-5.3/bin/jmeter -n \
+-t bigquery_jdbc_sampler.jmx \
+-Jproject_id=YOUR_PROJECT \
+-Juser.classpath=/path/to/your/SimbaJDBCDriverforGoogleBigQuery \
+-Jsimple_csv_path=test_queries/simple_selects.csv \
+-Jmedium_csv_path=test_queries/medium_selects.csv \
+-Jcomplex_csv_path=test_queries/complex_selects.csv \
+-Jerror_csv_path=errors.csv \
+-Jsimple_num_users=6 \
+-Jmedium_num_users=3 \
+-Jcomplex_num_users=1 \
+-Jnum_loops=-1 \
+-Jrun_id=jmeter_jdbc_test \
+-Jthread_duration=10 \
+-Jramp_time=0;
diff --git a/performance_testing/jmeter/test_queries/complex_selects.csv b/performance_testing/jmeter/test_queries/complex_selects.csv
new file mode 100644
index 000000000..0f5b3f042
--- /dev/null
+++ b/performance_testing/jmeter/test_queries/complex_selects.csv
@@ -0,0 +1,101 @@
+complex_id complex_query
+complex_00 SELECT 'some complex query 00';
+complex_01 SELECT 'some complex query 01';
+complex_02 SELECT 'some complex query 02';
+complex_03 SELECT 'some complex query 03';
+complex_04 SELECT 'some complex query 04';
+complex_05 SELECT 'some complex query 05';
+complex_06 SELECT 'some complex query 06';
+complex_07 SELECT 'some complex query 07';
+complex_08 SELECT 'some complex query 08';
+complex_09 SELECT 'some complex query 09';
+complex_10 SELECT 'some complex query 10';
+complex_11 SELECT 'some complex query 11';
+complex_12 SELECT 'some complex query 12';
+complex_13 SELECT 'some complex query 13';
+complex_14 SELECT 'some complex query 14';
+complex_15 SELECT 'some complex query 15';
+complex_16 SELECT 'some complex query 16';
+complex_17 SELECT 'some complex query 17';
+complex_18 SELECT 'some complex query 18';
+complex_19 SELECT 'some complex query 19';
+complex_20 SELECT 'some complex query 20';
+complex_21 SELECT 'some complex query 21';
+complex_22 SELECT 'some complex query 22';
+complex_23 SELECT 'some complex query 23';
+complex_24 SELECT 'some complex query 24';
+complex_25 SELECT 'some complex query 25';
+complex_26 SELECT 'some complex query 26';
+complex_27 SELECT 'some complex query 27';
+complex_28 SELECT 'some complex query 28';
+complex_29 SELECT 'some complex query 29';
+complex_30 SELECT 'some complex query 30';
+complex_31 SELECT 'some complex query 31';
+complex_32 SELECT 'some complex query 32';
+complex_33 SELECT 'some complex query 33';
+complex_34 SELECT 'some complex query 34';
+complex_35 SELECT 'some complex query 35';
+complex_36 SELECT 'some complex query 36';
+complex_37 SELECT 'some complex query 37';
+complex_38 SELECT 'some complex query 38';
+complex_39 SELECT 'some complex query 39';
+complex_40 SELECT 'some complex query 40';
+complex_41 SELECT 'some complex query 41';
+complex_42 SELECT 'some complex query 42';
+complex_43 SELECT 'some complex query 43';
+complex_44 SELECT 'some complex query 44';
+complex_45 SELECT 'some complex query 45';
+complex_46 SELECT 'some complex query 46';
+complex_47 SELECT 'some complex query 47';
+complex_48 SELECT 'some complex query 48';
+complex_49 SELECT 'some complex query 49';
+complex_50 SELECT 'some complex query 50';
+complex_51 SELECT 'some complex query 51';
+complex_52 SELECT 'some complex query 52';
+complex_53 SELECT 'some complex query 53';
+complex_54 SELECT 'some complex query 54';
+complex_55 SELECT 'some complex query 55';
+complex_56 SELECT 'some complex query 56';
+complex_57 SELECT 'some complex query 57';
+complex_58 SELECT 'some complex query 58';
+complex_59 SELECT 'some complex query 59';
+complex_60 SELECT 'some complex query 60';
+complex_61 SELECT 'some complex query 61';
+complex_62 SELECT 'some complex query 62';
+complex_63 SELECT 'some complex query 63';
+complex_64 SELECT 'some complex query 64';
+complex_65 SELECT 'some complex query 65';
+complex_66 SELECT 'some complex query 66';
+complex_67 SELECT 'some complex query 67';
+complex_68 SELECT 'some complex query 68';
+complex_69 SELECT 'some complex query 69';
+complex_70 SELECT 'some complex query 70';
+complex_71 SELECT 'some complex query 71';
+complex_72 SELECT 'some complex query 72';
+complex_73 SELECT 'some complex query 73';
+complex_74 SELECT 'some complex query 74';
+complex_75 SELECT 'some complex query 75';
+complex_76 SELECT 'some complex query 76';
+complex_77 SELECT 'some complex query 77';
+complex_78 SELECT 'some complex query 78';
+complex_79 SELECT 'some complex query 79';
+complex_80 SELECT 'some complex query 80';
+complex_81 SELECT 'some complex query 81';
+complex_82 SELECT 'some complex query 82';
+complex_83 SELECT 'some complex query 83';
+complex_84 SELECT 'some complex query 84';
+complex_85 SELECT 'some complex query 85';
+complex_86 SELECT 'some complex query 86';
+complex_87 SELECT 'some complex query 87';
+complex_88 SELECT 'some complex query 88';
+complex_89 SELECT 'some complex query 89';
+complex_90 SELECT 'some complex query 90';
+complex_91 SELECT 'some complex query 91';
+complex_92 SELECT 'some complex query 92';
+complex_93 SELECT 'some complex query 93';
+complex_94 SELECT 'some complex query 94';
+complex_95 SELECT 'some complex query 95';
+complex_96 SELECT 'some complex query 96';
+complex_97 SELECT 'some complex query 97';
+complex_98 SELECT 'some complex query 98';
+complex_99 SELECT 'some complex query 99';
diff --git a/performance_testing/jmeter/test_queries/medium_selects.csv b/performance_testing/jmeter/test_queries/medium_selects.csv
new file mode 100644
index 000000000..59cdabe62
--- /dev/null
+++ b/performance_testing/jmeter/test_queries/medium_selects.csv
@@ -0,0 +1,101 @@
+medium_id medium_query
+medium_00 SELECT 'some medium complexity query 00';
+medium_01 SELECT 'some medium complexity query 01';
+medium_02 SELECT 'some medium complexity query 02';
+medium_03 SELECT 'some medium complexity query 03';
+medium_04 SELECT 'some medium complexity query 04';
+medium_05 SELECT 'some medium complexity query 05';
+medium_06 SELECT 'some medium complexity query 06';
+medium_07 SELECT 'some medium complexity query 07';
+medium_08 SELECT 'some medium complexity query 08';
+medium_09 SELECT 'some medium complexity query 09';
+medium_10 SELECT 'some medium complexity query 10';
+medium_11 SELECT 'some medium complexity query 11';
+medium_12 SELECT 'some medium complexity query 12';
+medium_13 SELECT 'some medium complexity query 13';
+medium_14 SELECT 'some medium complexity query 14';
+medium_15 SELECT 'some medium complexity query 15';
+medium_16 SELECT 'some medium complexity query 16';
+medium_17 SELECT 'some medium complexity query 17';
+medium_18 SELECT 'some medium complexity query 18';
+medium_19 SELECT 'some medium complexity query 19';
+medium_20 SELECT 'some medium complexity query 20';
+medium_21 SELECT 'some medium complexity query 21';
+medium_22 SELECT 'some medium complexity query 22';
+medium_23 SELECT 'some medium complexity query 23';
+medium_24 SELECT 'some medium complexity query 24';
+medium_25 SELECT 'some medium complexity query 25';
+medium_26 SELECT 'some medium complexity query 26';
+medium_27 SELECT 'some medium complexity query 27';
+medium_28 SELECT 'some medium complexity query 28';
+medium_29 SELECT 'some medium complexity query 29';
+medium_30 SELECT 'some medium complexity query 30';
+medium_31 SELECT 'some medium complexity query 31';
+medium_32 SELECT 'some medium complexity query 32';
+medium_33 SELECT 'some medium complexity query 33';
+medium_34 SELECT 'some medium complexity query 34';
+medium_35 SELECT 'some medium complexity query 35';
+medium_36 SELECT 'some medium complexity query 36';
+medium_37 SELECT 'some medium complexity query 37';
+medium_38 SELECT 'some medium complexity query 38';
+medium_39 SELECT 'some medium complexity query 39';
+medium_40 SELECT 'some medium complexity query 40';
+medium_41 SELECT 'some medium complexity query 41';
+medium_42 SELECT 'some medium complexity query 42';
+medium_43 SELECT 'some medium complexity query 43';
+medium_44 SELECT 'some medium complexity query 44';
+medium_45 SELECT 'some medium complexity query 45';
+medium_46 SELECT 'some medium complexity query 46';
+medium_47 SELECT 'some medium complexity query 47';
+medium_48 SELECT 'some medium complexity query 48';
+medium_49 SELECT 'some medium complexity query 49';
+medium_50 SELECT 'some medium complexity query 50';
+medium_51 SELECT 'some medium complexity query 51';
+medium_52 SELECT 'some medium complexity query 52';
+medium_53 SELECT 'some medium complexity query 53';
+medium_54 SELECT 'some medium complexity query 54';
+medium_55 SELECT 'some medium complexity query 55';
+medium_56 SELECT 'some medium complexity query 56';
+medium_57 SELECT 'some medium complexity query 57';
+medium_58 SELECT 'some medium complexity query 58';
+medium_59 SELECT 'some medium complexity query 59';
+medium_60 SELECT 'some medium complexity query 60';
+medium_61 SELECT 'some medium complexity query 61';
+medium_62 SELECT 'some medium complexity query 62';
+medium_63 SELECT 'some medium complexity query 63';
+medium_64 SELECT 'some medium complexity query 64';
+medium_65 SELECT 'some medium complexity query 65';
+medium_66 SELECT 'some medium complexity query 66';
+medium_67 SELECT 'some medium complexity query 67';
+medium_68 SELECT 'some medium complexity query 68';
+medium_69 SELECT 'some medium complexity query 69';
+medium_70 SELECT 'some medium complexity query 70';
+medium_71 SELECT 'some medium complexity query 71';
+medium_72 SELECT 'some medium complexity query 72';
+medium_73 SELECT 'some medium complexity query 73';
+medium_74 SELECT 'some medium complexity query 74';
+medium_75 SELECT 'some medium complexity query 75';
+medium_76 SELECT 'some medium complexity query 76';
+medium_77 SELECT 'some medium complexity query 77';
+medium_78 SELECT 'some medium complexity query 78';
+medium_79 SELECT 'some medium complexity query 79';
+medium_80 SELECT 'some medium complexity query 80';
+medium_81 SELECT 'some medium complexity query 81';
+medium_82 SELECT 'some medium complexity query 82';
+medium_83 SELECT 'some medium complexity query 83';
+medium_84 SELECT 'some medium complexity query 84';
+medium_85 SELECT 'some medium complexity query 85';
+medium_86 SELECT 'some medium complexity query 86';
+medium_87 SELECT 'some medium complexity query 87';
+medium_88 SELECT 'some medium complexity query 88';
+medium_89 SELECT 'some medium complexity query 89';
+medium_90 SELECT 'some medium complexity query 90';
+medium_91 SELECT 'some medium complexity query 91';
+medium_92 SELECT 'some medium complexity query 92';
+medium_93 SELECT 'some medium complexity query 93';
+medium_94 SELECT 'some medium complexity query 94';
+medium_95 SELECT 'some medium complexity query 95';
+medium_96 SELECT 'some medium complexity query 96';
+medium_97 SELECT 'some medium complexity query 97';
+medium_98 SELECT 'some medium complexity query 98';
+medium_99 SELECT 'some medium complexity query 99';
diff --git a/performance_testing/jmeter/test_queries/simple_selects.csv b/performance_testing/jmeter/test_queries/simple_selects.csv
new file mode 100644
index 000000000..11eabe283
--- /dev/null
+++ b/performance_testing/jmeter/test_queries/simple_selects.csv
@@ -0,0 +1,101 @@
+simple_id simple_query
+simple_00 SELECT 'some simple query 00';
+simple_01 SELECT 'some simple query 01';
+simple_02 SELECT 'some simple query 02';
+simple_03 SELECT 'some simple query 03';
+simple_04 SELECT 'some simple query 04';
+simple_05 SELECT 'some simple query 05';
+simple_06 SELECT 'some simple query 06';
+simple_07 SELECT 'some simple query 07';
+simple_08 SELECT 'some simple query 08';
+simple_09 SELECT 'some simple query 09';
+simple_10 SELECT 'some simple query 10';
+simple_11 SELECT 'some simple query 11';
+simple_12 SELECT 'some simple query 12';
+simple_13 SELECT 'some simple query 13';
+simple_14 SELECT 'some simple query 14';
+simple_15 SELECT 'some simple query 15';
+simple_16 SELECT 'some simple query 16';
+simple_17 SELECT 'some simple query 17';
+simple_18 SELECT 'some simple query 18';
+simple_19 SELECT 'some simple query 19';
+simple_20 SELECT 'some simple query 20';
+simple_21 SELECT 'some simple query 21';
+simple_22 SELECT 'some simple query 22';
+simple_23 SELECT 'some simple query 23';
+simple_24 SELECT 'some simple query 24';
+simple_25 SELECT 'some simple query 25';
+simple_26 SELECT 'some simple query 26';
+simple_27 SELECT 'some simple query 27';
+simple_28 SELECT 'some simple query 28';
+simple_29 SELECT 'some simple query 29';
+simple_30 SELECT 'some simple query 30';
+simple_31 SELECT 'some simple query 31';
+simple_32 SELECT 'some simple query 32';
+simple_33 SELECT 'some simple query 33';
+simple_34 SELECT 'some simple query 34';
+simple_35 SELECT 'some simple query 35';
+simple_36 SELECT 'some simple query 36';
+simple_37 SELECT 'some simple query 37';
+simple_38 SELECT 'some simple query 38';
+simple_39 SELECT 'some simple query 39';
+simple_40 SELECT 'some simple query 40';
+simple_41 SELECT 'some simple query 41';
+simple_42 SELECT 'some simple query 42';
+simple_43 SELECT 'some simple query 43';
+simple_44 SELECT 'some simple query 44';
+simple_45 SELECT 'some simple query 45';
+simple_46 SELECT 'some simple query 46';
+simple_47 SELECT 'some simple query 47';
+simple_48 SELECT 'some simple query 48';
+simple_49 SELECT 'some simple query 49';
+simple_50 SELECT 'some simple query 50';
+simple_51 SELECT 'some simple query 51';
+simple_52 SELECT 'some simple query 52';
+simple_53 SELECT 'some simple query 53';
+simple_54 SELECT 'some simple query 54';
+simple_55 SELECT 'some simple query 55';
+simple_56 SELECT 'some simple query 56';
+simple_57 SELECT 'some simple query 57';
+simple_58 SELECT 'some simple query 58';
+simple_59 SELECT 'some simple query 59';
+simple_60 SELECT 'some simple query 60';
+simple_61 SELECT 'some simple query 61';
+simple_62 SELECT 'some simple query 62';
+simple_63 SELECT 'some simple query 63';
+simple_64 SELECT 'some simple query 64';
+simple_65 SELECT 'some simple query 65';
+simple_66 SELECT 'some simple query 66';
+simple_67 SELECT 'some simple query 67';
+simple_68 SELECT 'some simple query 68';
+simple_69 SELECT 'some simple query 69';
+simple_70 SELECT 'some simple query 70';
+simple_71 SELECT 'some simple query 71';
+simple_72 SELECT 'some simple query 72';
+simple_73 SELECT 'some simple query 73';
+simple_74 SELECT 'some simple query 74';
+simple_75 SELECT 'some simple query 75';
+simple_76 SELECT 'some simple query 76';
+simple_77 SELECT 'some simple query 77';
+simple_78 SELECT 'some simple query 78';
+simple_79 SELECT 'some simple query 79';
+simple_80 SELECT 'some simple query 80';
+simple_81 SELECT 'some simple query 81';
+simple_82 SELECT 'some simple query 82';
+simple_83 SELECT 'some simple query 83';
+simple_84 SELECT 'some simple query 84';
+simple_85 SELECT 'some simple query 85';
+simple_86 SELECT 'some simple query 86';
+simple_87 SELECT 'some simple query 87';
+simple_88 SELECT 'some simple query 88';
+simple_89 SELECT 'some simple query 89';
+simple_90 SELECT 'some simple query 90';
+simple_91 SELECT 'some simple query 91';
+simple_92 SELECT 'some simple query 92';
+simple_93 SELECT 'some simple query 93';
+simple_94 SELECT 'some simple query 94';
+simple_95 SELECT 'some simple query 95';
+simple_96 SELECT 'some simple query 96';
+simple_97 SELECT 'some simple query 97';
+simple_98 SELECT 'some simple query 98';
+simple_99 SELECT 'some simple query 99';
diff --git a/tools/.gitignore b/tools/.gitignore
new file mode 100644
index 000000000..c18dd8d83
--- /dev/null
+++ b/tools/.gitignore
@@ -0,0 +1 @@
+__pycache__/
diff --git a/tools/cloud_functions/gcs_event_based_ingest/.flake8 b/tools/cloud_functions/gcs_event_based_ingest/.flake8
index dafc87320..732e2a9fc 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/.flake8
+++ b/tools/cloud_functions/gcs_event_based_ingest/.flake8
@@ -1,6 +1,6 @@
[flake8]
max-line-length = 110
ignore = E731,W504,I001,W503,E402
-exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,.eggs,*.egg,node_modules,.venv
+exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,.eggs,*.egg,node_modules,.venv,.terraform
# format = ${cyan}%(path)s${reset}:${yellow_bold}%(row)d${reset}:${green_bold}%(col)d${reset}: ${red_bold}%(code)s${reset} %(text)s
diff --git a/tools/cloud_functions/gcs_event_based_ingest/.gitignore b/tools/cloud_functions/gcs_event_based_ingest/.gitignore
new file mode 100644
index 000000000..66d580175
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/.gitignore
@@ -0,0 +1,2 @@
+prof/
+test.log
diff --git a/tools/cloud_functions/gcs_event_based_ingest/.hadolint.yaml b/tools/cloud_functions/gcs_event_based_ingest/.hadolint.yaml
new file mode 100644
index 000000000..8f7e23e45
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/.hadolint.yaml
@@ -0,0 +1,2 @@
+ignored:
+ - DL3008
diff --git a/tools/cloud_functions/gcs_event_based_ingest/.isort.cfg b/tools/cloud_functions/gcs_event_based_ingest/.isort.cfg
index ed7944aca..6f72bca0f 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/.isort.cfg
+++ b/tools/cloud_functions/gcs_event_based_ingest/.isort.cfg
@@ -1,3 +1,5 @@
[settings]
src_paths=backfill.py,gcs_ocn_bq_ingest,test
skip=terraform_module
+force_single_line=True
+single_line_exclusions=typing
diff --git a/tools/cloud_functions/gcs_event_based_ingest/Dockerfile.ci b/tools/cloud_functions/gcs_event_based_ingest/Dockerfile.ci
index 5cd40aa1e..2c656ef94 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/Dockerfile.ci
+++ b/tools/cloud_functions/gcs_event_based_ingest/Dockerfile.ci
@@ -1,4 +1,17 @@
-FROM python:3.8-slim
+FROM python:3.8
+RUN apt-get update \
+ && apt-get install --no-install-recommends -y \
+ apt-transport-https \
+ ca-certificates \
+ curl \
+ sudo \
+ unzip \
+ && apt-get autoremove -yqq --purge \
+ && apt-get clean && rm -rf /var/lib/apt/lists/*
+WORKDIR /ci
COPY requirements.txt requirements-dev.txt ./
+COPY scripts/install_terraform.sh ./
+RUN ./install_terraform.sh
RUN pip3 install --no-cache-dir -r requirements-dev.txt
-ENTRYPOINT ["pytest"]
+ENTRYPOINT ["python3", "-m", "pytest"]
+
diff --git a/tools/cloud_functions/gcs_event_based_ingest/ORDERING.md b/tools/cloud_functions/gcs_event_based_ingest/ORDERING.md
new file mode 100644
index 000000000..4ae20dd0f
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/ORDERING.md
@@ -0,0 +1,202 @@
+# Ordering Batches
+There are use cases where it is important for incremental batches get
+applied in order rather than as soon as they are uploaded to GCS (which is the
+default behavior of this solution).
+1. When using External Query that performs DML other than insert only.
+(e.g. an `UPDATE` assumes that prior batches have already been committed)
+1. To ensure that there are not time gaps in the data (e.g. ensure that
+2020/01/02 data is not committed to BigQuery before 2020/01/01, or similarly
+that 00 hour is ingested before the 01 hour, etc.)
+
+This Cloud Function supports serializing the submission of ingestion jobs to
+BigQuery by using Google Cloud Storage's consistency guarantees to provide a
+pessimistic lock on a table to prevent concurrent jobs and
+[GCS Object.list](https://cloud.google.com/storage/docs/json_api/v1/objects/list)
+lexicographic sorting of results to providing ordering gurantees.
+The solution involves a table level `_backlog/` directory to keep track
+of success files whose batches have not yet been committed to BigQuery and
+a table level `_bqlock` file to keep track of what job is currently ingesting to
+that table. This way we can make our Cloud Function idempotent by having all the
+state stored in GCS so we can safely retrigger it to skirt the Cloud Functions
+timeout.
+
+## Assumptions
+This ordering solution assumes that you want to apply batches in lexicographic
+order. This is usually the case because path names usually contain some sort of
+date / hour information.
+
+## Enabling Ordering
+### Environment Variable
+Ordering can be enabled at the function level by setting the `ORDER_PER_TABLE`
+environment variable to `"True"`.
+### Config File
+Ordering can be configured at any level of your naming convention (e.g. dataset
+table or some sub-path) by placing a `_config/ORDERME` file. This can be helpful
+in scenarios where your historical load can be processed safely in parallel but
+incrementals must be ordered.
+For example:
+```text
+gs://${BUCKET}/${DATASET}/${TABLE}/historical/_config/load.json
+gs://${BUCKET}/${DATASET}/${TABLE}/incremental/_config/external.json
+gs://${BUCKET}/${DATASET}/${TABLE}/incremental/_config/bq_transform.sql
+gs://${BUCKET}/${DATASET}/${TABLE}/incremental/_config/ORDERME
+```
+
+## Dealing With Out-of-Order Publishing to GCS During Historical Load
+In some use cases, there is a period where incrementals that must be applied in
+order are uploaded in parallel (meaning their `_SUCCESS` files are expected to
+be out of order). This typically happens during some historical backfill period.
+This can be solved by setting the `START_BACKFILL_FILENAME` environment
+variable to a file name that indicates that the parallel upload of historical
+incrementals is complete (e.g. `_HISTORYDONE`). This will cause all success
+files for a table to be added to the backlog until the `_HISTORYDONE` file is
+dropped at the table level. At that point the backlog subscriber will begin
+processing the batches in order.
+
+## Batch Failure Behavior
+When ordering is enabled, if the BQ job to apply a batch failed, it is not safe
+to continue to ingest the next batch. The Cloud Function will leave the
+`_bqlock` file and stop trying to process the backlog. The Cloud function
+will report an exception like this which should be alerted on as the ingestion
+process for the table will be deadlocked until there is human intervention to
+address the failed batch:
+```text
+ f"previous BigQuery job: {job_id} failed or could not "
+ "be found. This will kill the backfill subscriber for "
+ f"the table prefix {table_prefix}."
+ "Once the issue is dealt with by a human, the lock"
+ "file at: "
+ f"gs://{lock_blob.bucket.name}/{lock_blob.name} "
+ "should be manually removed and a new empty _BACKFILL"
+ "file uploaded to:"
+ f"gs://{lock_blob.bucket.name}/{table_prefix}/_BACKFILL"
+ f"to resume the backfill subscriber so it can "
+ "continue with the next item in the backlog.\n"
+ "Original Exception:\n"
+ f"{traceback.format_exc()}")
+```
+Note that once the `_bqlock` is removed and `_BACKFILL` is reposted, the Cloud
+Function will proceed by applying the next batch in the `_backlog`. This means,
+if you have applied the batch manually you should remove this object from the
+`_backlog`. However, if you have patched the data on GCS for the failed batch
+and would like the cloud function to apply it, then you leave this object in the
+`_backlog`.
+
+## Ordering Mechanics Explained
+We've treated ordering incremental commits to table as a variation on the
+[Producer-Consumer Problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
+Where we have multiple producers (each call of Backlog Publisher) and a single
+Consumer (the Backlog Subscriber which is enforced to be a singleton per table
+with a claim file). Our solution is to use GCS `_backlog` directory as our queue
+and `_bqlock` as a mutex. There is still a rare corner case of a race condition
+that we handle as well.
+
+### Backlog Publisher
+The Backlog Publisher has two responsibilities:
+1. add incoming success files to a
+table's `_backlog` so they are not "forgotten" by the ingestion system.
+1. if there is a non-empty backlog start the backfill subscriber (if one is not
+already running). This is accomplished by uploading a table level `_BACKFILL`
+file if it does not already exist.
+
+### Backlog Subscriber
+The Backlog Subscriber is responsible for keeping track of BigQuery jobs running
+on a table and ensure that batches are committed in order. When the backlog is
+not empty for a table the backlog subscriber should be running for that table
+unless a job has failed.
+It will either be polling a `RUNNING` BigQuery job for completion, or submitting
+the next batch in the `_backlog`.
+
+The state of what BigQuery job is currently running on a table is kept in a
+`_bqlock` file at the table prefix.
+
+In order to escape the maximum nine-minute (540s) Cloud Function Timeout, the
+backfill subscriber will re-trigger itself by posting a new `_BACKFILL` file
+until the `_backlog` for the table prefix is empty. When a new success file
+arrives it is the responsibility of the publisher to restart the subscriber if
+one is not already running.
+
+### Example: Life of a Table
+The following process explains the triggers (GCS files) and actions of the
+Cloud Function for a single table prefix.
+
+1. Source data uploaded to GCS prefix for the destination dataset / table, etc.
+ - `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/foo-data-00.csv`
+ - `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/foo-data-01.csv`
+ - `gs://ingestion-bucket/dataset/table/historical/2020/01/02/04/foo-data-00.csv`
+ - `gs://ingestion-bucket/dataset/table/incremental/2020/01/02/05/foo-data-01.csv`
+1. Success file uploaded to GCS (to indicate this atomic batch is ready to be
+applied).
+ - `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/_SUCCESS`
+ - `gs://ingestion-bucket/dataset/table/historical/2020/01/02/04/_SUCCESS`
+ - `gs://ingestion-bucket/dataset/table/incremental/2020/01/02/05/_SUCCESS`
+1. Backlog Publisher adds a pointer to each success file in the backlog for the
+table.
+ - `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/03/_SUCCESS`
+ - `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/04/_SUCCESS`
+ - `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
+1. If the `START_BACKFILL_FILENAME` is set and the file exists at the table prefix, After adding each item the backlog, the Backlog Publisher will start the
+Backfill Subscriber if it is not already running (as indicated by a `_BACKFILL`
+file). If the `START_BACKFILL_FILENAME` is not present the backlog subscriber
+will not be started until this file is uploaded.
+ - `gs://ingestion-bucket/dataset/table/_BACKFILL`
+1. The Backlog Subscriber will look at the backlog and apply the batches in
+order (lexicographic). This process looks like this:
+ 1. Claim this backfill file:
+ - `gs://ingestion-bucket/dataset/table/_claimed__BACKFILL_created_at_...`
+ 1. Claim first batch in backlog (ensure no duplicate processing):
+ - `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/_claimed__SUCCESS_created_at_...`
+ 1. Submit the BigQuery Job for this batch (load job or external query based on the `_config/*` files)
+ - Ingest the data at the `gs://ingestion-bucket/dataset/table/historical/2020/01/02/03/*` prefix
+ - Store the job ID in `gs://ingestion-bucket/dataset/table/_bqlock`
+ 1. Wait for this Job to complete successfully and remove this item from the backlog.
+ - If job is `DONE` with errors:
+ - Raise exception (do not continue to process any more batches)
+ - If job is `DONE` without errors remove the pointer from the backlog:
+ - DELETE `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/03/_SUCCESS`
+ 1. Repeat from Backlog Subscriber step 2
+ - Where the first item in the backlog is now
+ - `gs://ingestion-bucket/dataset/table/_backlog/historical/2020/01/02/04/_SUCCESS`
+ - And on the next loop:
+ - `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
+1. Backlog Subscriber sees the `_backlog/` is empty for the table. In other words
+The BigQuery table is caught up with the data on GCS.
+ - DELETE `gs://ingestion-bucket/dataset/table/_BACKFILL` and exit
+1. The next day a new incremental arrives
+ - `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
+1. The Backlog Publisher adds this item to the backlog and wakes up the
+Backfill Subscriber by posting a new `_BACKFILL` file.
+ - `gs://ingestion-bucket/dataset/table/_backlog/incremental/2020/01/02/05/_SUCCESS`
+ - `gs://ingestion-bucket/dataset/table/_BACKFILL`
+1. Backlog Subscriber will handle the backlog of just one item
+(See Backlog Subscriber step #5 and #6 above)
+
+
+### Note on Handling Race Condition
+We use `subscribe_monitor` to handle a rare race condition where:
+
+1. subscriber reads an empty backlog (before it can delete the
+ _BACKFILL blob...)
+2. a new item is added to the backlog (causing a separate
+ function invocation)
+3. In this new invocation we reach this point in the code path
+ and start_subscriber_if_not_running sees the old _BACKFILL
+ and does not create a new one.
+4. The subscriber deletes the _BACKFILL blob and exits without
+ processing the new item on the backlog from #2.
+
+We handle this by the following:
+
+1. When success file added to the backlog starts this monitoring
+to wait 10 seconds before checking that the backfill file exists. To catch if
+the backfill file disappears when it should not. This might trigger an extra
+loop of the backfill subscriber but this loop will not take any action and this
+wasted compute is far better than dropping a batch of data.
+1. On the subscriber side we check if there was more time
+than 10 seconds between list backlog items and delete backfill calls. If so the
+subscriber double checks that the backlog is still empty. This way we always
+handle this race condition either in this monitor or in the subscriber itself.
+
+
+### Visualization of Ordering Triggers in the Cloud Function
+
diff --git a/tools/cloud_functions/gcs_event_based_ingest/README.md b/tools/cloud_functions/gcs_event_based_ingest/README.md
index 9fda82d39..4c0cac057 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/README.md
+++ b/tools/cloud_functions/gcs_event_based_ingest/README.md
@@ -21,14 +21,18 @@ By Default we try to read dataset, table, partition (or yyyy/mm/dd/hh) and
batch id using the following python regex:
```python3
DEFAULT_DESTINATION_REGEX = (
- r"^(?P[\w\-\._0-9]+)/" # dataset (required)
- r"(?P
[\w\-_0-9]+)/?" # table name (required)
+ # break up historical v.s. incremental to separate prefixes (optional)
+ r"(?:historical|incremental)?/?"
+ r"(?P\$[0-9]+)?/?" # partition decorator (optional)
+ r"(?:" # [begin] yyyy/mm/dd/hh/ group (optional)
+ r"(?P[0-9]{4})/?" # partition year (yyyy) (optional)
+ r"(?P[0-9]{2})?/?" # partition month (mm) (optional)
+ r"(?P
[0-9]{2})?/?" # partition day (dd) (optional)
+ r"(?P[0-9]{2})?/?" # partition hour (hh) (optional)
+ r")?" # [end]yyyy/mm/dd/hh/ group (optional)
+ r"(?P[\w\-_0-9]+)?/" # batch id (optional)
)
```
you can see if this meets your needs in this [regex playground](https://regex101.com/r/5Y9TDh/2)
@@ -37,8 +41,10 @@ better fit your naming convention on GCS. Your regex must include
[Python Regex with named capturing groups](https://docs.python.org/3/howto/regex.html#non-capturing-and-named-groups)
for destination `dataset`, and `table`.
Note, that `dataset` can optionally, explicitly specify destination project
-(i.e. `gs://${BUCKET}/project_id.dataset_id/table/....`) otherwise the default
-project will be inferred from Application Default Credential (the project in
+(i.e. `gs://${BUCKET}/project_id.dataset_id/table/....`) alternatively,
+one can set the `BQ_STORAGE_PROJECT` environment variable to set to override the
+default target project for datasets at the function level. The default behavior is to
+infer the project from Application Default Credential (the project in
which the Cloud Function is running, or the ADC configured in Google Cloud SDK
if invoked locally). This is useful in scenarios where a single deployment of
the Cloud Function is responsible for ingesting data into BigQuery tables in
@@ -133,20 +139,50 @@ The result of merging these would be:
This configuration system gives us the ability to DRY up common defaults but
override them at whatever level is appropriate as new cases come up.
+### Note on Delimiters: Use Unicode
+For CSV loads the `fieldDelimiter` in load.json to external.json should be
+specified as a unicode character _not_ a hexidecimal character as hexidecimal
+characters will confuse python's `json.load` function.
+For example ctrl-P should be specified as:
+```json
+{
+ "fieldDelimiter": "\u0010"
+}
+```
+
#### Transformation SQL
In some cases we may need to perform transformations on the files in GCS
before they can be loaded to BigQuery. This is handled by query on an
temporary external table over the GCS objects as a proxy for load job.
`gs://${INGESTION_BUCKET}/${BQ_DATASET}/${BQ_TABLE_NAME}/_config/bq_transform.sql`
-Note, external queries will consume query slots from this project's reservation
-or count towards your on-demand billing. They will _not_ use free tie load slots.
+By default, if a query job finishes of statement type
+`INSERT`,`UPDATE`,`DELETE`, or `MERGE` and `numDmlRowsAffected = 0` this will be
+treated as a failure ([See Query Job Statistics API docs](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2)).
+This is usually due to a bad query / configuration with bad DML predicate.
+For example running the following query on an empty table:
+
+```sql
+UPDATE foo.bar dest ... FROM temp_ext src WHERE src.id = dest.id
+```
+
+By failing on this condition we keep the backlog intact when we run a query job
+that unexpectedly did no affect any rows.
+This can be disabled by setting the environment variable
+`FAIL_ON_ZERO_DML_ROWS_AFFECTED=False`.
+
+A `CREATE OR REPLACE TABLE` is not DML and will not be subject to this behavior.
+##### Cost Note
+External queries will consume query slots from this project's reservation
+or count towards your on-demand billing.
+They will _not_ use free tier load slots.
+
+##### External Table Name: `temp_ext`
Note, that the query should select from a `temp_ext` which will be a temporary
external table configured on the fly by the Cloud Function.
The query must handle the logic for inserting into the destination table.
-This means it should use BigQuery DML to either `INSERT` or `MERGE` into the
-destination table.
+This means it should use BigQuery DML to mutate the destination table.
For example:
```sql
INSERT {dest_dataset}.{dest_table}
@@ -198,6 +234,11 @@ at any parent folders `_config` prefix. This allows you dictate
"for this table any new batch should `WRITE_TRUNCATE` it's parent partition/table"
or "for that table any new batch should `WRITE_APPEND` to it's parent partition/table".
+## Controlling BigQuery Compute Project
+By default BigQuery jobs will be submitted in the project where the Cloud Function
+is deployed. To submit jobs in another BigQuery project set the `BQ_PROJECT`
+environment variable.
+
## Monitoring
Monitoring what data has been loaded by this solution should be done with the
BigQuery [`INFORMATION_SCHEMA` jobs metadata](https://cloud.google.com/bigquery/docs/information-schema-jobs)
@@ -235,14 +276,20 @@ SELECT
total_slot_ms,
destination_table
state,
+ error_result,
(SELECT value FROM UNNEST(labels) WHERE key = "component") as component,
(SELECT value FROM UNNEST(labels) WHERE key = "cloud-function-name") as cloud_function_name,
(SELECT value FROM UNNEST(labels) WHERE key = "batch-id") as batch_id,
FROM
`region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE
- (SELECT value FROM UNNEST(labels) WHERE key = "component") = "gcf-ingest-"
+ (SELECT value FROM UNNEST(labels) WHERE key = "component") = "event-based-gcs-ingest"
```
+If your external queries have mutliple sql statements only the parent job will
+follow the `gcf-ingest-*` naming convention. Children jobs (for each statement)
+begin with prefix _script_job. These jobs will still be labelled with
+`component` and `cloud-function-name`.
+For more information see [Scripting in Standard SQL](https://cloud.google.com/bigquery/docs/reference/standard-sql/scripting)
## Triggers
GCS Object Finalize triggers can communicate with Cloud Functions directly or
@@ -282,6 +329,10 @@ It's better for us to make a conscious decision to adopt new features or adjust
CI configs or pin older version depending on the type for failure.
This CI should be run on all new PRs and nightly.
+Note, all functionality of the cloud function (including ordering) is
+integration tested against buckets with object versioning enabled to ensure this
+solution works for buckets using this feature.
+
### Just Running the Tests
#### Running in Docker
```bash
@@ -299,8 +350,10 @@ docker run --rm -it gcr.io/$PROJECT_ID/gcs_event_based_ingest_ci
#### Running on your local machine
Alternatively to the local cloudbuild or using the docker container to run your
tests, you can `pip3 install -r requirements-dev.txt` and select certain tests
-to run with [`pytest`](https://docs.pytest.org/en/stable/usage.html). This is
-mostly useful if you'd like to integrate with your IDE debugger.
+to run with [`python3 -m pytest`](https://docs.pytest.org/en/stable/usage.html).
+Note, this is not quite the same as callin `pytest` without the `python -m` prefix
+([pytest invocation docs](https://docs.pytest.org/en/stable/usage.html#calling-pytest-through-python-m-pytest))
+This is mostly useful if you'd like to integrate with your IDE debugger.
Note that integration tests will spin up / tear down cloud resources that can
incur a small cost. These resources will be spun up based on your Google Cloud SDK
@@ -314,16 +367,24 @@ See more info on sharing pytest fixtures in the [pytest docs](https://docs.pytes
#### Running All Tests
```bash
-pytest
+python3 -m pytest
```
#### Running Unit Tests Only
```bash
-pytest -m "not IT"
+python3 -m pytest -m "not IT"
```
#### Running Integration Tests Only
```bash
-pytest -m IT
+python3 -m pytest -m IT
+```
+
+#### Running System Tests Only
+The system tests assume that you have deployed the cloud function.
+```bash
+export TF_VAR_short_sha=$(git rev-parse --short=10 HEAD)
+export TF_VAR_project_id=jferriero-pp-dev
+python3 -m pytest -vvv e2e
```
## Deployment
@@ -351,7 +412,7 @@ gcloud functions deploy test-gcs-bq-ingest \
--trigger-topic=${PUBSUB_TOPIC} \
--service-account=${SERVICE_ACCOUNT_EMAIL} \
--timeout=540 \
- --set-env-vars='DESTINATION_REGEX=^(?:[\w\-0-9]+)/(?P[\w\-_0-9]+)/(?P
[0-9]{2})?/?(?P[0-9]{2})?/?(?P[0-9]+)?/?,FUNCTION_TIMEOUT_SEC=540'
```
In theory, one could set up Pub/Sub notifications from multiple GCS Buckets
@@ -376,6 +437,12 @@ In theory, one could set up Pub/Sub notifications from multiple GCS Buckets
Pub/Sub topic so that data uploaded to any of these buckets could get
automatically loaded to BigQuery by a single deployment of the Cloud Function.
+## Ordering Guarantees
+It is possible to configure the Cloud Function to apply incrementals in order if
+this is crucial to your data integrity. This naturally comes with a performance
+penalty as for a given table we cannot parallelize ingestion of batches.
+The ordering behavior and options are described in detail in [ORDERING.md](ORDERING.md)
+
## Backfill
There are some cases where you may have data already copied to GCS according to
the naming convention / with success files before the Object Change
@@ -385,6 +452,21 @@ files. The utility supports either invoking the Cloud Function main method
locally (in concurrent threads) or publishing notifications for the success
files (for a deployed Cloud Function to pick up).
+### Backfill and Ordering
+If you use the ordering feature on a table (or function wide) you should use the
+`NOTIFICATIONS` mode to repost notifications to a pub/sub topic that your
+deployed Cloud Function is listening to. The `LOCAL` mode does not support
+ordering because this feature relies on (re)posting files like `_bqlock`,
+`_BACKFILL` and various claim files and getting re-triggered by object
+notifications for these.
+The script will publish the notifications for success files and the Cloud
+Function will add these to the appropriate table's backlog.
+Once the script completes you can drop the `START_BACKFILL_FILENAME`
+(e.g. `_HISTORYDONE`) for each table you want to trigger the backfill for.
+In general, it would not be safe for this utility to drop a `_HISTORYDONE` for
+every table because the parallel historical loads might still be in progress.
+
+
### Usage
```
python3 -m backfill -h
diff --git a/tools/cloud_functions/gcs_event_based_ingest/__init__.py b/tools/cloud_functions/gcs_event_based_ingest/__init__.py
index 7a3efb203..42ed0a407 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/__init__.py
+++ b/tools/cloud_functions/gcs_event_based_ingest/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2020 Google LLC.
+# Copyright 2021 Google LLC.
# This software is provided as-is, without warranty or representation
# for any use or purpose.
# Your use of it is subject to your agreement with Google.
diff --git a/tools/cloud_functions/gcs_event_based_ingest/backfill.py b/tools/cloud_functions/gcs_event_based_ingest/backfill.py
index f0a2ce415..105397553 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/backfill.py
+++ b/tools/cloud_functions/gcs_event_based_ingest/backfill.py
@@ -1,4 +1,4 @@
-# Copyright 2020 Google LLC
+# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/tools/cloud_functions/gcs_event_based_ingest/cloudbuild.yaml b/tools/cloud_functions/gcs_event_based_ingest/cloudbuild.yaml
index d1367b925..0ae2de0ae 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/cloudbuild.yaml
+++ b/tools/cloud_functions/gcs_event_based_ingest/cloudbuild.yaml
@@ -1,4 +1,4 @@
-# Copyright 2020 Google LLC.
+# Copyright 2021 Google LLC.
# This software is provided as-is, without warranty or representation
# for any use or purpose.
# Your use of it is subject to your agreement with Google.
@@ -20,6 +20,8 @@ steps:
dir: '${_BUILD_DIR}'
entrypoint: '/bin/hadolint'
args:
+ - '--config'
+ - '.hadolint.yaml'
- 'Dockerfile.ci'
id: 'lint-ci-docker-image'
- name: 'gcr.io/kaniko-project/executor:latest'
@@ -113,21 +115,43 @@ steps:
- 'mypy-main'
- 'mypy-tests'
- 'terraform-fmt'
+ entrypoint: /bin/sh
args:
- - '-m'
- - 'not IT'
+ - '-c'
+ # pip installing again to get GCB to recognize mocker from pytest-mock
+ - 'pip install -r requirements-dev.txt && python3 -m pytest tests -m "not IT"'
+ # GCB sometimes get stuck on this step and is doomed to not recover.
+ # This is usually remedied by just re-running the build.
+ # adding this unit-test step level timeout so we can fail sooner and retry.
+ timeout: 15s
id: 'unit-test'
- name: 'gcr.io/$PROJECT_ID/gcs_event_based_ingest_ci'
dir: '${_BUILD_DIR}'
waitFor:
- 'build-ci-image'
- 'unit-test'
+ entrypoint: /bin/sh
args:
- - '--maxfail=1'
- - '-m'
- - 'IT'
+ - '-c'
+ - 'pip install -r requirements-dev.txt && python3 -m pytest tests -m IT'
id: 'integration-test'
+- name: 'gcr.io/$PROJECT_ID/gcs_event_based_ingest_ci'
+ dir: '${_BUILD_DIR}'
+ env:
+ - 'TF_VAR_project_id=$PROJECT_ID'
+ - 'TF_VAR_region=$_REGION'
+ - 'TF_VAR_short_sha=$SHORT_SHA'
+ waitFor:
+ - 'integration-test'
+ - 'build-ci-image'
+ entrypoint: /bin/sh
+ args:
+ - '-c'
+ - 'python3 -m pytest -vvv e2e'
+ id: 'e2e-test'
+timeout: '3600s'
options:
- machineType: 'N1_HIGHCPU_8'
+ machineType: 'N1_HIGHCPU_32'
substitutions:
'_BUILD_DIR': 'tools/cloud_functions/gcs_event_based_ingest'
+ '_REGION': 'us-central1'
diff --git a/tools/cloud_functions/gcs_event_based_ingest/e2e/.gitignore b/tools/cloud_functions/gcs_event_based_ingest/e2e/.gitignore
new file mode 100644
index 000000000..9e399369c
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/e2e/.gitignore
@@ -0,0 +1,35 @@
+# Local .terraform directories
+**/.terraform/*
+
+# .tfstate files
+*.tfstate
+*.tfstate.*
+
+# Crash log files
+crash.log
+
+# Exclude all .tfvars files, which are likely to contain sentitive data, such as
+# password, private keys, and other secrets. These should not be part of version
+# control as they are data points which are potentially sensitive and subject
+# to change depending on the environment.
+#
+*.tfvars
+
+# Ignore override files as they are usually used to override resources locally and so
+# are not checked in
+override.tf
+override.tf.json
+*_override.tf
+*_override.tf.json
+
+# Include override files you do wish to add to version control using negated pattern
+#
+# !example_override.tf
+
+# Include tfplan files to ignore the plan output of command: terraform plan -out=tfplan
+# example: *tfplan*
+
+# Ignore CLI configuration files
+.terraformrc
+terraform.rc
+
diff --git a/tools/cloud_functions/gcs_event_based_ingest/tests/cli/__init__.py b/tools/cloud_functions/gcs_event_based_ingest/e2e/__init__.py
similarity index 96%
rename from tools/cloud_functions/gcs_event_based_ingest/tests/cli/__init__.py
rename to tools/cloud_functions/gcs_event_based_ingest/e2e/__init__.py
index 7a3efb203..42ed0a407 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/tests/cli/__init__.py
+++ b/tools/cloud_functions/gcs_event_based_ingest/e2e/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2020 Google LLC.
+# Copyright 2021 Google LLC.
# This software is provided as-is, without warranty or representation
# for any use or purpose.
# Your use of it is subject to your agreement with Google.
diff --git a/tools/cloud_functions/gcs_event_based_ingest/e2e/conftest.py b/tools/cloud_functions/gcs_event_based_ingest/e2e/conftest.py
new file mode 100644
index 000000000..7f3c73205
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/e2e/conftest.py
@@ -0,0 +1,121 @@
+# Copyright 2021 Google LLC.
+# This software is provided as-is, without warranty or representation
+# for any use or purpose.
+# Your use of it is subject to your agreement with Google.
+
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""End-to-end tests for event based BigQuery ingest Cloud Function."""
+import json
+import os
+import re
+import shlex
+import subprocess
+import uuid
+
+import pytest
+from google.cloud import bigquery
+from google.cloud import storage
+
+TEST_DIR = os.path.realpath(os.path.dirname(__file__))
+
+ANSI_ESCAPE_PATTERN = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
+
+
+@pytest.fixture(scope="module")
+def bq() -> bigquery.Client:
+ """BigQuery Client"""
+ return bigquery.Client(location="US")
+
+
+@pytest.fixture(scope="module")
+def gcs() -> storage.Client:
+ """GCS Client"""
+ return storage.Client()
+
+
+@pytest.fixture(scope='module')
+def terraform_infra(request):
+
+ def _escape(in_str):
+ if in_str is not None:
+ return ANSI_ESCAPE_PATTERN.sub('', in_str.decode('UTF-8'))
+ return None
+
+ def _run(cmd):
+ result = subprocess.run(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ cwd=TEST_DIR)
+ print(_escape(result.stdout))
+ if result.returncode == 0:
+ return
+ raise subprocess.CalledProcessError(returncode=result.returncode,
+ cmd=result.args,
+ output=_escape(result.stdout),
+ stderr=_escape(result.stderr))
+
+ init = shlex.split("terraform init")
+ apply = shlex.split("terraform apply -auto-approve")
+ destroy = shlex.split("terraform destroy -auto-approve")
+
+ _run(init)
+ _run(apply)
+
+ def teardown():
+ _run(destroy)
+
+ request.addfinalizer(teardown)
+ with open(os.path.join(TEST_DIR, "terraform.tfstate")) as tf_state_file:
+ return json.load(tf_state_file)
+
+
+@pytest.fixture
+def dest_dataset(request, bq, monkeypatch):
+ random_dataset = (f"test_bq_ingest_gcf_"
+ f"{str(uuid.uuid4())[:8].replace('-','_')}")
+ dataset = bigquery.Dataset(f"{os.getenv('TF_VAR_project_id', 'bqutil')}"
+ f".{random_dataset}")
+ dataset.location = "US"
+ bq.create_dataset(dataset)
+ monkeypatch.setenv("BQ_LOAD_STATE_TABLE",
+ f"{dataset.dataset_id}.serverless_bq_loads")
+ print(f"created dataset {dataset.dataset_id}")
+
+ def teardown():
+ bq.delete_dataset(dataset, delete_contents=True, not_found_ok=True)
+
+ request.addfinalizer(teardown)
+ return dataset
+
+
+@pytest.fixture(scope="function")
+def dest_table(request, bq: bigquery.Client, dest_dataset) -> bigquery.Table:
+ public_table: bigquery.Table = bq.get_table(
+ bigquery.TableReference.from_string(
+ "bigquery-public-data.new_york_311.311_service_requests"))
+ schema = public_table.schema
+
+ table: bigquery.Table = bigquery.Table(
+ f"{os.environ.get('TF_VAR_project_id', 'bqutil')}"
+ f".{dest_dataset.dataset_id}.cf_e2e_test_nyc_311_"
+ f"{os.getenv('SHORT_SHA', 'manual')}",
+ schema=schema,
+ )
+
+ table = bq.create_table(table)
+
+ def teardown():
+ bq.delete_table(table, not_found_ok=True)
+
+ request.addfinalizer(teardown)
+ return table
diff --git a/tools/cloud_functions/gcs_event_based_ingest/e2e/e2e_test.py b/tools/cloud_functions/gcs_event_based_ingest/e2e/e2e_test.py
new file mode 100644
index 000000000..8ffa44c2f
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/e2e/e2e_test.py
@@ -0,0 +1,133 @@
+# Copyright 2021 Google LLC.
+# This software is provided as-is, without warranty or representation
+# for any use or purpose.
+# Your use of it is subject to your agreement with Google.
+
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""End-to-end test for GCS event based ingest to BigQuery Cloud Function"""
+import concurrent.futures
+import json
+import time
+from typing import Dict
+
+import pytest
+from google.cloud import bigquery
+from google.cloud import storage
+
+WAIT_FOR_ROWS_TIMEOUT = 180 # seconds
+
+
+@pytest.mark.SYS
+def test_cloud_function_long_runnning_bq_jobs_with_orderme(
+ gcs: storage.Client, bq: bigquery.Client, dest_table: bigquery.Table,
+ terraform_infra: Dict):
+ """This test assumes the cloud function has been deployed with the
+ accompanying terraform module which configures a 1 min timeout.
+ It exports some larger data from a public BigQuery table and then reloads
+ them to test table to test the cloud function behavior with longer running
+ BigQuery jobs which are likely to require the backlog subscriber to restart
+ itself by reposting a _BACKFILL file. The ordering behavior is controlled
+ with the ORDERME blob.
+ """
+ input_bucket_id = terraform_infra['outputs']['bucket']['value']
+ table_prefix = f"{dest_table.dataset_id}/" \
+ f"{dest_table.table_id}"
+ extract_config = bigquery.ExtractJobConfig()
+ extract_config.destination_format = bigquery.DestinationFormat.AVRO
+ public_table: bigquery.Table = bq.get_table(
+ bigquery.TableReference.from_string(
+ "bigquery-public-data.new_york_311.311_service_requests"))
+
+ def _extract(batch: str):
+ extract_job: bigquery.ExtractJob = bq.extract_table(
+ public_table, f"gs://{input_bucket_id}/{table_prefix}/{batch}/"
+ f"data-*.avro",
+ job_config=extract_config)
+ return extract_job.result()
+
+ batches = [
+ "historical/00", "historical/01", "historical/02", "incremental/03"
+ ]
+ history_batch_nums = ["00", "01", "02"]
+ with concurrent.futures.ThreadPoolExecutor() as pool:
+ # export some data from public BQ table into a historical partitions
+ extract_results = pool.map(_extract, batches)
+
+ for res in extract_results:
+ assert res.errors is None, f"extract job {res.job_id} failed"
+
+ bkt: storage.Bucket = gcs.lookup_bucket(input_bucket_id)
+ # configure load jobs for this table
+ load_config_blob = bkt.blob(f"{table_prefix}/_config/load.json")
+ load_config_blob.upload_from_string(
+ json.dumps({
+ "writeDisposition": "WRITE_APPEND",
+ "sourceFormat": "AVRO",
+ "useAvroLogicalTypes": "True",
+ }))
+ orderme_blob = bkt.blob(f"{table_prefix}/_config/ORDERME")
+ orderme_blob.upload_from_string("")
+ # add historical success files
+ for batch in history_batch_nums:
+ historical_success_blob: storage.Blob = bkt.blob(
+ f"{table_prefix}/historical/{batch}/_SUCCESS")
+ historical_success_blob.upload_from_string("")
+
+ # assert 0 bq rows (because _HISTORYDONE not dropped yet)
+ dest_table = bq.get_table(dest_table)
+ assert dest_table.num_rows == 0, \
+ "history was ingested before _HISTORYDONE was uploaded"
+
+ # add _HISTORYDONE
+ history_done_blob: storage.Blob = bkt.blob(f"{table_prefix}/_HISTORYDONE")
+ history_done_blob.upload_from_string("")
+
+ # wait for bq rows to reach expected num rows
+ bq_wait_for_rows(bq, dest_table,
+ public_table.num_rows * len(history_batch_nums))
+
+ # add the incremental success file
+ incremental_success_blob: storage.Blob = bkt.blob(
+ f"{table_prefix}/{batches[-1]}/_SUCCESS")
+ incremental_success_blob.upload_from_string("")
+
+ # wait on new expected bq rows
+ bq_wait_for_rows(bq, dest_table, public_table.num_rows * len(batches))
+
+
+def bq_wait_for_rows(bq_client: bigquery.Client, table: bigquery.Table,
+ expected_num_rows: int):
+ """
+ polls tables.get API for number of rows until reaches expected value or
+ times out.
+
+ This is mostly an optimization to speed up the test suite without making it
+ flaky.
+ """
+
+ start_poll = time.monotonic()
+ actual_num_rows = 0
+ while time.monotonic() - start_poll < WAIT_FOR_ROWS_TIMEOUT:
+ bq_table: bigquery.Table = bq_client.get_table(table)
+ actual_num_rows = bq_table.num_rows
+ if actual_num_rows == expected_num_rows:
+ return
+ if actual_num_rows > expected_num_rows:
+ raise AssertionError(
+ f"{table.project}.{table.dataset_id}.{table.table_id} has"
+ f"{actual_num_rows} rows. expected {expected_num_rows} rows.")
+ raise AssertionError(
+ f"Timed out after {WAIT_FOR_ROWS_TIMEOUT} seconds waiting for "
+ f"{table.project}.{table.dataset_id}.{table.table_id} to "
+ f"reach {expected_num_rows} rows."
+ f"last poll returned {actual_num_rows} rows.")
diff --git a/tools/cloud_functions/gcs_event_based_ingest/e2e/main.tf b/tools/cloud_functions/gcs_event_based_ingest/e2e/main.tf
new file mode 100644
index 000000000..64e3973d3
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/e2e/main.tf
@@ -0,0 +1,50 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+variable "short_sha" {}
+variable "project_id" { default = "bqutil" }
+variable "region" { default = "us-central1" }
+output "bucket" {
+ value = module.gcs_ocn_bq_ingest.input-bucket
+}
+
+resource "google_storage_bucket" "cloud_functions_source" {
+ name = "gcf-source-archives${var.short_sha}"
+ project = var.project_id
+ storage_class = "REGIONAL"
+ location = var.region
+ force_destroy = "true"
+}
+
+module "gcs_ocn_bq_ingest" {
+ source = "../terraform_module/gcs_ocn_bq_ingest_function"
+ function_source_folder = "../gcs_ocn_bq_ingest"
+ app_id = "gcs-ocn-bq-ingest-e2e-test${var.short_sha}"
+ cloudfunctions_source_bucket = google_storage_bucket.cloud_functions_source.name
+ data_ingester_sa = "data-ingester-sa${var.short_sha}"
+ input_bucket = "gcs-ocn-bq-ingest-e2e-tests${var.short_sha}"
+ project_id = var.project_id
+ environment_variables = {
+ START_BACKFILL_FILENAME = "_HISTORYDONE"
+ }
+ # We'll use a shorter timeout for e2e stress subscriber re-triggering
+ timeout = 60
+ force_destroy = "true"
+}
+
+terraform {
+ backend "local" {
+ path = "terraform.tfstate"
+ }
+}
+
diff --git a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/README.md b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/README.md
index c86dceea4..20c023825 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/README.md
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/README.md
@@ -9,12 +9,13 @@ BigQuery Table.
1. [Pub/Sub Notification](https://cloud.google.com/storage/docs/pubsub-notifications)
object finalize.
1. Cloud Function subscribes to notifications and ingests all the data into
-BigQuery a directory once a `_SUCCESS` file arrives.
+BigQuery from a GCS prefix once a `_SUCCESS` file arrives. The success file name
+is configurable with environment variable.
## Deployment
The source for this Cloud Function can easily be reused to repeat this pattern
-for many tables by using the accompanying terraform module (TODO).
+for many tables by using the accompanying terraform module.
This way we can reuse the tested source code for the Cloud Function.
@@ -28,14 +29,43 @@ following default behavior.
|-----------------------|---------------------------------------|----------------------------------------------|
| `WAIT_FOR_JOB_SECONDS`| How long to wait before deciding BQ job did not fail quickly| `5` |
| `SUCCESS_FILENAME` | Filename to trigger a load of a prefix| `_SUCCESS` |
-| `DESTINATION_REGEX` | A [Python Regex with named capturing groups](https://docs.python.org/3/howto/regex.html#non-capturing-and-named-groups) for `dataset`, `table`, (optional: `partition` or `yyyy`, `mm`, `dd`, `hh`, `batch`)
+| `DESTINATION_REGEX` | A [Python Regex with named capturing groups](https://docs.python.org/3/howto/regex.html#non-capturing-and-named-groups) for `dataset`, `table`, (optional: `partition` or `yyyy`, `mm`, `dd`, `hh`, `batch`) | (see below)|
| `MAX_BATCH_BYTES` | Max bytes for BigQuery Load job | `15000000000000` ([15 TB](https://cloud.google.com/bigquery/quotas#load_jobs)|
| `JOB_PREFIX` | Prefix for BigQuery Job IDs | `gcf-ingest-` |
+| `BQ_PROJECT` | Default BQ project to use to submit load / query jobs | Project where Cloud Function is deployed |
+| `BQ_STORAGE_PROJECT` | Default BQ project to use for target table references if not specified in dataset capturing group | Project where Cloud Function is deployed |
+| `FUNCTION_TIMEOUT_SEC`| Number of seconds set for this deployment of Cloud Function (no longer part of python38 runtime) | 60 |
+| `FAIL_ON_ZERO_DML_ROWS_AFFECTED` | Treat External Queries that result in `numDmlAffectedRows = 0` as failures | True |
+| `ORDER_PER_TABLE`\* | Force jobs to be executed sequentially (rather than parallel) based on the backlog. This is the same as having an `ORDERME` file in every config directory | `False` |
+| `START_BACKFILL_FILENAME`\*| Block submitting BigQuery Jobs for a table until this file is present at the table prefix. By default this will not happen. | `None` |
+| `RESTART_BUFFER_SECONDS`\* | Buffer before Cloud Function timeout to leave before re-triggering the backfill subscriber | 30 |
+| `USE_ERROR_REPORTING_API` | Should errors be reported using error reporting api to avoid cold restart (optimization) | True |
+\* only affect the behavior when ordering is enabled for a table.
+See [ORDERING.md](../ORDERING.md)
+## Default Destination Regex
+```python3
+DEFAULT_DESTINATION_REGEX = (
+ r"^(?P[\w\-\._0-9]+)/" # dataset (required)
+ r"(?P
[\w\-_0-9]+)/?" # table name (required)
+ # break up historical v.s. incremental to separate prefixes (optional)
+ r"(?:historical|incremental)?/?"
+ r"(?P\$[0-9]+)?/?" # partition decorator (optional)
+ r"(?:" # [begin] yyyy/mm/dd/hh/ group (optional)
+ r"(?P[0-9]{4})/?" # partition year (yyyy) (optional)
+ r"(?P[0-9]{2})?/?" # partition month (mm) (optional)
+ r"(?P
[0-9]{2})?/?" # partition day (dd) (optional)
+ r"(?P[0-9]{2})?/?" # partition hour (hh) (optional)
+ r")?" # [end]yyyy/mm/dd/hh/ group (optional)
+ r"(?P[\w\-_0-9]+)?/" # batch id (optional)
+)
+`
+
## Implementation notes
1. To support notifications based on a GCS prefix
(rather than every object in the bucket), we chose to use manually
configure Pub/Sub Notifications manually and use a Pub/Sub triggered
Cloud Function.
+
diff --git a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/__init__.py b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/__init__.py
index 7a3efb203..42ed0a407 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/__init__.py
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2020 Google LLC.
+# Copyright 2021 Google LLC.
# This software is provided as-is, without warranty or representation
# for any use or purpose.
# Your use of it is subject to your agreement with Google.
diff --git a/tools/cloud_functions/gcs_event_based_ingest/tests/gcs_ocn_bq_ingest/__init__.py b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/__init__.py
similarity index 96%
rename from tools/cloud_functions/gcs_event_based_ingest/tests/gcs_ocn_bq_ingest/__init__.py
rename to tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/__init__.py
index 7a3efb203..42ed0a407 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/tests/gcs_ocn_bq_ingest/__init__.py
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2020 Google LLC.
+# Copyright 2021 Google LLC.
# This software is provided as-is, without warranty or representation
# for any use or purpose.
# Your use of it is subject to your agreement with Google.
diff --git a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/constants.py b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/constants.py
new file mode 100644
index 000000000..00cabfda1
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/constants.py
@@ -0,0 +1,143 @@
+# Copyright 2021 Google LLC.
+# This software is provided as-is, without warranty or representation
+# for any use or purpose.
+# Your use of it is subject to your agreement with Google.
+
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Configurations for Cloud Function for loading data from GCS to BigQuery.
+"""
+import distutils.util
+import os
+import re
+
+import google.api_core.client_info
+import google.cloud.exceptions
+
+# Will wait up to this long polling for errors in a bq job before exiting
+# This is to check if job fail quickly, not to assert it succeed.
+# This may not be honored if longer than cloud function timeout.
+# https://cloud.google.com/functions/docs/concepts/exec#timeout
+# One might consider lowering this to 1-2 seconds to lower the
+# upper bound of expected execution time to stay within the free tier.
+# https://cloud.google.com/functions/pricing#free_tier
+WAIT_FOR_JOB_SECONDS = int(os.getenv("WAIT_FOR_JOB_SECONDS", "5"))
+
+DEFAULT_EXTERNAL_TABLE_DEFINITION = {
+ # The default must be a self describing data format
+ # because autodetecting CSV /JSON schemas is likely to not match
+ # expectations / assumptions of the transformation query.
+ "sourceFormat": "PARQUET",
+}
+
+# Use caution when lowering the job polling rate.
+# Keep in mind that many concurrent executions of this cloud function should not
+# violate the 300 concurrent requests or 100 request per second.
+# https://cloud.google.com/bigquery/quotas#all_api_requests
+JOB_POLL_INTERVAL_SECONDS = 1
+
+DEFAULT_JOB_LABELS = {
+ "component": "event-based-gcs-ingest",
+ "cloud-function-name": os.getenv("K_SERVICE"),
+}
+
+DEFAULT_LOAD_JOB_CONFIG = {
+ "sourceFormat": "CSV",
+ "fieldDelimiter": ",",
+ "writeDisposition": "WRITE_APPEND",
+ "labels": DEFAULT_JOB_LABELS,
+}
+
+BASE_LOAD_JOB_CONFIG = {
+ "writeDisposition": "WRITE_APPEND",
+ "labels": DEFAULT_JOB_LABELS,
+}
+
+# https://cloud.google.com/bigquery/quotas#load_jobs
+# 15TB per BQ load job (soft limit).
+DEFAULT_MAX_BATCH_BYTES = str(15 * 10**12)
+
+# 10,000 GCS URIs per BQ load job.
+MAX_SOURCE_URIS_PER_LOAD = 10**4
+
+SUCCESS_FILENAME = os.getenv("SUCCESS_FILENAME", "_SUCCESS")
+
+DEFAULT_JOB_PREFIX = "gcf-ingest-"
+
+# yapf: disable
+DEFAULT_DESTINATION_REGEX = (
+ r"^(?P[\w\-\._0-9]+)/" # dataset (required)
+ r"(?P
[\w\-_0-9]+)/?" # table name (required)
+ # break up historical v.s. incremental to separate prefixes (optional)
+ r"(?:historical|incremental)?/?"
+ r"(?P\$[0-9]+)?/?" # partition decorator (optional)
+ r"(?:" # [begin] yyyy/mm/dd/hh/ group (optional)
+ r"(?P[0-9]{4})/?" # partition year (yyyy) (optional)
+ r"(?P[0-9]{2})?/?" # partition month (mm) (optional)
+ r"(?P
[0-9]{2})?/?" # partition day (dd) (optional)
+ r"(?P[0-9]{2})?/?" # partition hour (hh) (optional)
+ r")?" # [end]yyyy/mm/dd/hh/ group (optional)
+ r"(?P[\w\-_0-9]+)?/" # batch id (optional)
+)
+# yapf: enable
+
+DESTINATION_REGEX = re.compile(
+ os.getenv("DESTINATION_REGEX", DEFAULT_DESTINATION_REGEX))
+
+CLIENT_INFO = google.api_core.client_info.ClientInfo(
+ user_agent="google-pso-tool/bq-severless-loader")
+
+# Filename used to (re)start the backfill subscriber loop.
+BACKFILL_FILENAME = "_BACKFILL"
+
+# When this file is uploaded the subscriber will start applying items in order
+# off the backlog. This is meant to help scenarios where historical loads to GCS
+# are parallelized but must be applied in order. One can drop a _HISTORYDONE
+# file to indicate the entire history has been uploaded and it is safe to start
+# applying items in the backlog in order. By default this will be empty and the
+# backlog subscriber will not wait for any file and start applying the first
+# items in the backlog.
+START_BACKFILL_FILENAME = os.getenv("START_BACKFILL_FILENAME")
+
+# Filenames that cause cloud function to take action.
+ACTION_FILENAMES = {
+ SUCCESS_FILENAME,
+ BACKFILL_FILENAME,
+ START_BACKFILL_FILENAME,
+}
+
+RESTART_BUFFER_SECONDS = int(os.getenv("RESTART_BUFFER_SECONDS", "30"))
+
+ORDER_PER_TABLE = bool(
+ distutils.util.strtobool(os.getenv("ORDER_PER_TABLE", "False")))
+
+BQ_TRANSFORM_SQL = "*.sql"
+
+ENSURE_SUBSCRIBER_SECONDS = 5
+
+FAIL_ON_ZERO_DML_ROWS_AFFECTED = bool(
+ distutils.util.strtobool(os.getenv("FAIL_ON_ZERO_DML_ROWS_AFFECTED",
+ "True")))
+
+BQ_DML_STATEMENT_TYPES = {
+ "INSERT",
+ "UPDATE",
+ "DELETE",
+ "MERGE",
+}
+
+# https://cloud.google.com/bigquery/docs/running-jobs#generate-jobid
+NON_BQ_JOB_ID_REGEX = re.compile(r'[^0-9a-zA-Z_\-]+')
+
+ENABLE_SNAPSHOTTING = bool(
+ distutils.util.strtobool(os.getenv("ENABLE_SNAPSHOTTING", "FALSE")))
+SNAPSHOT_DATASET = str(os.getenv("SNAPSHOT_DATASET", "snapshots"))
diff --git a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/exceptions.py b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/exceptions.py
new file mode 100644
index 000000000..a9eb9bab5
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/exceptions.py
@@ -0,0 +1,61 @@
+# Copyright 2021 Google LLC.
+# This software is provided as-is, without warranty or representation
+# for any use or purpose.
+# Your use of it is subject to your agreement with Google.
+
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Custom Exceptions of GCS event based ingest to BigQuery"""
+
+
+class OneLineException(Exception):
+ """base class for exceptions whose messages will be displayed on a single
+ line for better readability in Cloud Function Logs"""
+
+ def __init__(self, msg):
+ super().__init__(msg.replace('\n', ' ').replace('\r', ''))
+
+
+class DuplicateNotificationException(OneLineException):
+ """Exception to indicate that the function was triggered twice for the same
+ event."""
+
+
+class BigQueryJobFailure(OneLineException):
+ """Exception to indicate that there was an issue with a BigQuery job. This
+ might include client errors (e.g. bad request which can happen if a _SUCCESS
+ file is dropped but there are not data files at the GCS prefix) or server
+ side errors like a job that fails to execute successfully."""
+
+
+class DestinationRegexMatchException(OneLineException):
+ """Exception to indicate that a success file did not match the destination
+ regex specified in the DESTINATION_REGEX environment variable (or the
+ default)"""
+
+
+class UnexpectedTriggerException(OneLineException):
+ """Exception to indicate the cloud function was triggered with an unexpected
+ payload."""
+
+
+class BacklogException(OneLineException):
+ """Exception to indicate an issue with the backlog mechanics of this
+ function."""
+
+
+EXCEPTIONS_TO_REPORT = (
+ BigQueryJobFailure,
+ UnexpectedTriggerException,
+ DestinationRegexMatchException,
+ BacklogException,
+)
diff --git a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py
new file mode 100644
index 000000000..28477e201
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/ordering.py
@@ -0,0 +1,369 @@
+# Copyright 2021 Google LLC.
+# This software is provided as-is, without warranty or representation
+# for any use or purpose.
+# Your use of it is subject to your agreement with Google.
+
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Implement function to ensure loading data from GCS to BigQuery in order.
+"""
+import datetime
+import os
+import time
+import traceback
+from typing import Optional, Tuple
+
+import google.api_core
+import google.api_core.exceptions
+import pytz
+# pylint in cloud build is being flaky about this import discovery.
+# pylint: disable=no-name-in-module
+from google.cloud import bigquery
+from google.cloud import storage
+
+from . import constants # pylint: disable=no-name-in-module,import-error
+from . import exceptions # pylint: disable=no-name-in-module,import-error
+from . import utils # pylint: disable=no-name-in-module,import-error
+
+
+def backlog_publisher(
+ gcs_client: storage.Client,
+ event_blob: storage.Blob,
+) -> Optional[storage.Blob]:
+ """add success files to the the backlog and trigger backfill if necessary"""
+ bkt = event_blob.bucket
+
+ # Create an entry in _backlog for this table for this batch / success file
+ backlog_blob = success_blob_to_backlog_blob(event_blob)
+ backlog_blob.upload_from_string("", client=gcs_client)
+ print(f"added gs://{backlog_blob.bucket.name}/{backlog_blob.name} "
+ "to the backlog.")
+
+ table_prefix = utils.get_table_prefix(event_blob.name)
+ return start_backfill_subscriber_if_not_running(gcs_client, bkt,
+ table_prefix)
+
+
+def backlog_subscriber(gcs_client: Optional[storage.Client],
+ bq_client: Optional[bigquery.Client],
+ backfill_blob: storage.Blob, function_start_time: float):
+ """Pick up the table lock, poll BQ job id until completion and process next
+ item in the backlog.
+ """
+ print(f"started backfill subscriber for gs://{backfill_blob.bucket.name}/"
+ f"{backfill_blob.name}")
+ gcs_client, bq_client = _get_clients_if_none(gcs_client, bq_client)
+ # We need to retrigger the backfill loop before the Cloud Functions Timeout.
+ restart_time = function_start_time + (
+ float(os.getenv("FUNCTION_TIMEOUT_SEC", "60")) -
+ constants.RESTART_BUFFER_SECONDS)
+ print(f"restart time is {restart_time}")
+ bkt = backfill_blob.bucket
+ utils.handle_duplicate_notification(gcs_client, backfill_blob)
+ table_prefix = utils.get_table_prefix(backfill_blob.name)
+ last_job_done = False
+ # we will poll for job completion this long in an individual iteration of
+ # the while loop (before checking if we are too close to cloud function
+ # timeout and should retrigger).
+ polling_timeout = 5 # seconds
+ lock_blob: storage.Blob = bkt.blob(f"{table_prefix}/_bqlock")
+ if restart_time - polling_timeout < time.monotonic():
+ raise EnvironmentError(
+ "The Cloud Function timeout is too short for "
+ "backlog subscriber to do it's job. We recommend "
+ "setting the timeout to 540 seconds or at least "
+ "1 minute (Cloud Functions default).")
+ while time.monotonic() < restart_time - polling_timeout - 1:
+ first_bq_lock_claim = False
+ lock_contents = utils.read_gcs_file_if_exists(
+ gcs_client, f"gs://{bkt.name}/{lock_blob.name}")
+ if lock_contents:
+ # is this a lock placed by this cloud function.
+ # the else will handle a manual _bqlock
+ if lock_contents.startswith(
+ os.getenv('JOB_PREFIX', constants.DEFAULT_JOB_PREFIX)):
+ last_job_done = wait_on_last_job(bq_client, lock_blob,
+ backfill_blob, lock_contents,
+ polling_timeout)
+ # if last_job_done = True, this means that a job just completed
+ # We need to check if SNAPSHOTTING is enabled
+ if last_job_done and constants.ENABLE_SNAPSHOTTING:
+ print("Snapshotting is enabled. Taking a snapshot")
+ utils.take_table_snapshot(bq_client, job_id=lock_contents,
+ lock_blob_name=lock_blob.name)
+ else:
+ print(f"sleeping for {polling_timeout} seconds because"
+ f"found manual lock gs://{bkt.name}/{lock_blob.name} with"
+ "This will be an infinite loop until the manual lock is "
+ "released. "
+ f"manual lock contents: {lock_contents}. ")
+ time.sleep(polling_timeout)
+ continue
+ else: # this condition handles absence of _bqlock file
+ first_bq_lock_claim = True
+ last_job_done = True # there's no running job to poll.
+
+ if not last_job_done:
+ # keep polling th running job.
+ continue
+
+ # if reached here, last job is done.
+ if not first_bq_lock_claim:
+ # If the BQ lock was missing we do not want to delete a backlog
+ # item for a job we have not yet submitted.
+ utils.remove_oldest_backlog_item(gcs_client, bkt, table_prefix)
+ should_subscriber_exit = handle_backlog(gcs_client, bq_client, bkt,
+ lock_blob, backfill_blob)
+ if should_subscriber_exit:
+ return
+ # retrigger the subscriber loop by reposting the _BACKFILL file
+ print("ran out of time, restarting backfill subscriber loop for:"
+ f"gs://{bkt.name}/{table_prefix}")
+ backfill_blob = bkt.blob(f"{table_prefix}/{constants.BACKFILL_FILENAME}")
+ backfill_blob.upload_from_string("")
+
+
+def wait_on_last_job(bq_client: bigquery.Client, lock_blob: storage.Blob,
+ backfill_blob: storage.blob, job_id: str,
+ polling_timeout: int):
+ """wait on a bigquery job or raise informative exception.
+
+ Args:
+ bq_client: bigquery.Client
+ lock_blob: storage.Blob _bqlock blob
+ backfill_blob: storage.blob _BACKFILL blob
+ job_id: str BigQuery job ID to wait on (read from _bqlock file)
+ polling_timeout: int seconds to poll before returning.
+ """
+ try:
+ return utils.wait_on_bq_job_id(bq_client, job_id, polling_timeout)
+ except (exceptions.BigQueryJobFailure,
+ google.api_core.exceptions.NotFound) as err:
+ table_prefix = utils.get_table_prefix(backfill_blob.name)
+ raise exceptions.BigQueryJobFailure(
+ f"previous BigQuery job: {job_id} failed or could not "
+ "be found. This will kill the backfill subscriber for "
+ f"the table prefix: {table_prefix}."
+ "Once the issue is dealt with by a human, the lock "
+ "file at: "
+ f"gs://{lock_blob.bucket.name}/{lock_blob.name} "
+ "should be manually removed and a new empty "
+ f"{constants.BACKFILL_FILENAME} "
+ "file uploaded to: "
+ f"gs://{backfill_blob.bucket.name}/{table_prefix}"
+ "/_BACKFILL "
+ f"to resume the backfill subscriber so it can "
+ "continue with the next item in the backlog."
+ "Original Exception: "
+ f"{traceback.format_exc()}") from err
+
+
+def handle_backlog(
+ gcs_client: storage.Client,
+ bq_client: bigquery.Client,
+ bkt: storage.Bucket,
+ lock_blob: storage.Blob,
+ backfill_blob: storage.Blob,
+):
+ """submit the next item in the _backlog if it is non-empty or clean up the
+ _BACKFILL and _bqlock files.
+ Args:
+ gcs_client: storage.Client
+ bq_client: bigquery.Client
+ bkt: storage.Bucket
+ lock_blob: storage.Blob _bqlock blob
+ backfill_blob: storage.blob _BACKFILL blob
+ Returns:
+ bool: should this backlog subscriber exit
+ """
+ table_prefix = utils.get_table_prefix(backfill_blob.name)
+ check_backlog_time = time.monotonic()
+ next_backlog_file = utils.get_next_backlog_item(gcs_client, bkt,
+ table_prefix)
+ if next_backlog_file:
+ next_success_file: storage.Blob = bkt.blob(
+ next_backlog_file.name.replace("/_backlog/", "/"))
+ if not next_success_file.exists(client=gcs_client):
+ raise exceptions.BacklogException(
+ "backlog contains "
+ f"gs://{next_backlog_file.bucket}/{next_backlog_file.name} "
+ "but the corresponding success file does not exist at: "
+ f"gs://{next_success_file.bucket}/{next_success_file.name}")
+ print("applying next batch for:"
+ f"gs://{next_success_file.bucket}/{next_success_file.name}")
+ next_job_id = utils.create_job_id(next_success_file.name)
+ utils.apply(gcs_client, bq_client, next_success_file, lock_blob,
+ next_job_id)
+ return False # BQ job running
+ print("no more files found in the backlog deleteing backfill blob")
+ backfill_blob.delete(if_generation_match=backfill_blob.generation,
+ client=gcs_client)
+ if (check_backlog_time + constants.ENSURE_SUBSCRIBER_SECONDS <
+ time.monotonic()):
+ print("checking if the backlog is still empty for "
+ f"gs://${bkt.name}/{table_prefix}/_backlog/"
+ f"There was more than {constants.ENSURE_SUBSCRIBER_SECONDS}"
+ " seconds between listing items on the backlog and "
+ f"deleting the {constants.BACKFILL_FILENAME}. "
+ "This should not happen often but is meant to alleviate a "
+ "race condition in the event that something caused the "
+ "delete operation was delayed or had to be retried for a "
+ "long time.")
+ next_backlog_file = utils.get_next_backlog_item(gcs_client, bkt,
+ table_prefix)
+ if next_backlog_file:
+ # The backfill file was deleted but the backlog is
+ # not empty. Re-trigger the backfill subscriber loop by
+ # dropping a new backfill file.
+ start_backfill_subscriber_if_not_running(gcs_client, bkt,
+ table_prefix)
+ return True # we are re-triggering a new backlog subscriber
+ utils.handle_bq_lock(gcs_client, lock_blob, None)
+ print(f"backlog is empty for gs://{bkt.name}/{table_prefix}. "
+ "backlog subscriber exiting.")
+ return True # the backlog is empty
+
+
+def start_backfill_subscriber_if_not_running(
+ gcs_client: Optional[storage.Client], bkt: storage.Bucket,
+ table_prefix: str) -> Optional[storage.Blob]:
+ """start the backfill subscriber if it is not already runnning for this
+ table prefix.
+
+ created a backfill file for the table prefix if not exists.
+ """
+ if not gcs_client:
+ gcs_client = storage.Client(client_info=constants.CLIENT_INFO)
+ start_backfill = True
+ # Do not start subscriber until START_BACKFILL_FILENAME has been dropped
+ # at the table prefix.
+ if constants.START_BACKFILL_FILENAME:
+ start_backfill_blob = bkt.blob(
+ f"{table_prefix}/{constants.START_BACKFILL_FILENAME}")
+ start_backfill = start_backfill_blob.exists(client=gcs_client)
+ if not start_backfill:
+ print("note triggering backfill because"
+ f"gs://{start_backfill_blob.bucket.name}/"
+ f"{start_backfill_blob.name} was not found.")
+
+ if start_backfill:
+ # Create a _BACKFILL file for this table if not exists
+ backfill_blob = bkt.blob(
+ f"{table_prefix}/{constants.BACKFILL_FILENAME}")
+ try:
+ backfill_blob.upload_from_string("",
+ if_generation_match=0,
+ client=gcs_client)
+ print("triggered backfill with "
+ f"gs://{backfill_blob.bucket.name}/{backfill_blob.name} "
+ f"created at {backfill_blob.time_created}.")
+ return backfill_blob
+ except google.api_core.exceptions.PreconditionFailed:
+ backfill_blob.reload(client=gcs_client)
+ print("backfill already in progress due to: "
+ f"gs://{backfill_blob.bucket.name}/{backfill_blob.name} "
+ f"created at {backfill_blob.time_created}. exiting.")
+ return backfill_blob
+ else:
+ return None
+
+
+def success_blob_to_backlog_blob(success_blob: storage.Blob) -> storage.Blob:
+ """create a blob object that is a pointer to the input success blob in the
+ backlog
+ """
+ bkt = success_blob.bucket
+ table_prefix = utils.get_table_prefix(success_blob.name)
+ success_file_suffix = utils.removeprefix(success_blob.name,
+ f"{table_prefix}/")
+ return bkt.blob(f"{table_prefix}/_backlog/{success_file_suffix}")
+
+
+def subscriber_monitor(gcs_client: Optional[storage.Client],
+ bkt: storage.Bucket, object_id: str) -> bool:
+ """
+ Monitor to handle a rare race condition where:
+
+ 1. subscriber reads an empty backlog (before it can delete the
+ _BACKFILL blob...)
+ 2. a new item is added to the backlog (causing a separate
+ function invocation)
+ 3. In this new invocation we reach this point in the code path
+ and start_backlog_subscriber_if_not_running sees the old _BACKFILL
+ and does not create a new one.
+ 4. The subscriber deletes the _BACKFILL blob and exits without
+ processing the new item on the backlog from #2.
+
+ We handle this by success file added to the backlog starts this monitoring
+ to wait constants.ENSURE_SUBSCRIBER_SECONDS before checking that the
+ backfill file exists. On the subscriber side we check if there was more time
+ than this between list backlog items and delete backfill calls. This way
+ we always handle this race condition either in this monitor or in the
+ subscriber itself.
+ """
+ if not gcs_client:
+ gcs_client = storage.Client(client_info=constants.CLIENT_INFO)
+ backfill_blob = start_backfill_subscriber_if_not_running(
+ gcs_client, bkt, utils.get_table_prefix(object_id))
+
+ # backfill blob may be none if the START_BACKFILL_FILENAME has not been
+ # dropped
+ if backfill_blob:
+ # Handle case where a subscriber loop was not able to repost the
+ # backfill file before the cloud function timeout.
+ time_created_utc = backfill_blob.time_created.replace(tzinfo=pytz.UTC)
+ now_utc = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
+ if (now_utc - time_created_utc > datetime.timedelta(
+ seconds=int(os.getenv("FUNCTION_TIMEOUT_SEC", "60")))):
+ print(
+ f"backfill blob gs://{backfill_blob.bucket.name}/"
+ f"{backfill_blob.name} appears to be abandoned as it is older "
+ "than the cloud function timeout of "
+ f"{os.getenv('FUNCTION_TIMEOUT_SEC', '60')} seconds."
+ "reposting this backfill blob to restart the backfill"
+ "subscriber for this table.")
+ backfill_blob.delete(client=gcs_client)
+ start_backfill_subscriber_if_not_running(
+ gcs_client, bkt, utils.get_table_prefix(object_id))
+ return True
+
+ time.sleep(constants.ENSURE_SUBSCRIBER_SECONDS)
+ while not utils.wait_on_gcs_blob(gcs_client, backfill_blob,
+ constants.ENSURE_SUBSCRIBER_SECONDS):
+ start_backfill_subscriber_if_not_running(
+ gcs_client, bkt, utils.get_table_prefix(object_id))
+ return True
+ return False
+
+
+def _get_clients_if_none(
+ gcs_client: Optional[storage.Client], bq_client: Optional[bigquery.Client]
+) -> Tuple[storage.Client, bigquery.Client]:
+ """method to handle case where clients are None.
+
+ This is a workaround to be able to run the backlog subscriber in a separate
+ process to facilitate some of our integration tests. Though it should be
+ harmless if these clients are recreated in the Cloud Function.
+ """
+ print("instantiating missing clients in backlog subscriber this should only"
+ " happen during integration tests.")
+ if not gcs_client:
+ gcs_client = storage.Client(client_info=constants.CLIENT_INFO)
+ if not bq_client:
+ default_query_config = bigquery.QueryJobConfig()
+ default_query_config.use_legacy_sql = False
+ default_query_config.labels = constants.DEFAULT_JOB_LABELS
+ bq_client = bigquery.Client(
+ client_info=constants.CLIENT_INFO,
+ default_query_job_config=default_query_config,
+ project=os.getenv("BQ_PROJECT", os.getenv("GCP_PROJECT")))
+ return gcs_client, bq_client
diff --git a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py
new file mode 100644
index 000000000..32e40cec2
--- /dev/null
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py
@@ -0,0 +1,810 @@
+# Copyright 2021 Google LLC.
+# This software is provided as-is, without warranty or representation
+# for any use or purpose.
+# Your use of it is subject to your agreement with Google.
+
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains utility methods used by the BQIngest process
+"""
+import collections
+import collections.abc
+import copy
+import fnmatch
+import json
+import os
+import pathlib
+import pprint
+import sys
+import time
+import uuid
+from typing import Any, Deque, Dict, List, Optional, Tuple, Union
+
+import cachetools
+import google.api_core
+import google.api_core.client_info
+import google.api_core.exceptions
+import google.cloud.exceptions
+# pylint in cloud build is being flaky about this import discovery.
+from google.cloud import bigquery
+from google.cloud import storage
+
+from . import constants # pylint: disable=no-name-in-module,import-error
+from . import exceptions # pylint: disable=no-name-in-module,import-error
+
+
+def external_query( # pylint: disable=too-many-arguments
+ gcs_client: storage.Client, bq_client: bigquery.Client, gsurl: str,
+ query: str, dest_table_ref: bigquery.TableReference, job_id: str):
+ """Load from query over external table from GCS.
+
+ This hinges on a SQL query defined in GCS at _config/*.sql and
+ an external table definition _config/external.json (otherwise will assume
+ CSV external table)
+ """
+ external_table_config = read_gcs_file_if_exists(
+ gcs_client, f"{gsurl}_config/external.json")
+ if not external_table_config:
+ external_table_config = look_for_config_in_parents(
+ gcs_client, gsurl, "external.json")
+ if external_table_config:
+ external_table_def = json.loads(external_table_config)
+ else:
+ print(f" {gsurl}_config/external.json not found in parents of {gsurl}. "
+ "Falling back to default PARQUET external table: "
+ f"{json.dumps(constants.DEFAULT_EXTERNAL_TABLE_DEFINITION)}")
+ external_table_def = constants.DEFAULT_EXTERNAL_TABLE_DEFINITION
+
+ # This may cause an issue if >10,000 files.
+ external_table_def["sourceUris"] = flatten2dlist(
+ get_batches_for_prefix(gcs_client, gsurl))
+ print(f"external table def = {json.dumps(external_table_config, indent=2)}")
+ external_config = bigquery.ExternalConfig.from_api_repr(external_table_def)
+ job_config = bigquery.QueryJobConfig(
+ table_definitions={"temp_ext": external_config}, use_legacy_sql=False)
+
+ # drop partition decorator if present.
+ table_id = dest_table_ref.table_id.split("$")[0]
+
+ # similar syntax to str.format but doesn't require escaping braces
+ # elsewhere in query (e.g. in a regex)
+ rendered_query = query\
+ .replace(
+ "{dest_dataset}",
+ f"`{dest_table_ref.project}`.{dest_table_ref.dataset_id}")\
+ .replace("{dest_table}", table_id)
+
+ job: bigquery.QueryJob = bq_client.query(rendered_query,
+ job_config=job_config,
+ job_id=job_id)
+
+ print(f"started asynchronous query job: {job.job_id}")
+
+ start_poll_for_errors = time.monotonic()
+ # Check if job failed quickly
+ while time.monotonic(
+ ) - start_poll_for_errors < constants.WAIT_FOR_JOB_SECONDS:
+ job.reload(client=bq_client)
+ if job.state == "DONE":
+ check_for_bq_job_and_children_errors(bq_client, job)
+ return
+ time.sleep(constants.JOB_POLL_INTERVAL_SECONDS)
+
+
+def flatten2dlist(arr: List[List[Any]]) -> List[Any]:
+ """Flatten list of lists to flat list of elements"""
+ return [j for i in arr for j in i]
+
+
+def load_batches(gcs_client, bq_client, gsurl, dest_table_ref, job_id):
+ """orchestrate 1 or more load jobs based on number of URIs and total byte
+ size of objects at gsurl"""
+ batches = get_batches_for_prefix(gcs_client, gsurl)
+ load_config = construct_load_job_config(gcs_client, gsurl)
+ load_config.labels = constants.DEFAULT_JOB_LABELS
+
+ jobs: List[bigquery.LoadJob] = []
+ for batch in batches:
+ print(load_config.to_api_repr())
+ job: bigquery.LoadJob = bq_client.load_table_from_uri(
+ batch, dest_table_ref, job_config=load_config, job_id=job_id)
+
+ print(f"started asyncronous bigquery load job with id: {job.job_id} for"
+ f" {gsurl}")
+ jobs.append(job)
+
+ start_poll_for_errors = time.monotonic()
+ # Check if job failed quickly
+ while time.monotonic(
+ ) - start_poll_for_errors < constants.WAIT_FOR_JOB_SECONDS:
+ # Check if job failed quickly
+ for job in jobs:
+ job.reload(client=bq_client)
+ check_for_bq_job_and_children_errors(bq_client, job)
+ time.sleep(constants.JOB_POLL_INTERVAL_SECONDS)
+
+
+def _get_parent_config_file(storage_client, config_filename, bucket, path):
+ bkt = storage_client.lookup_bucket(bucket)
+ config_dir_name = "_config"
+ parent_path = pathlib.Path(path).parent
+ config_path = parent_path / config_dir_name
+ config_file_path = config_path / config_filename
+ # Handle wild card (to support bq transform sql with different names).
+ if "*" in config_filename:
+ matches: List[storage.Blob] = list(
+ filter(lambda blob: fnmatch.fnmatch(blob.name, config_filename),
+ bkt.list_blobs(prefix=config_path)))
+ if matches:
+ if len(matches) > 1:
+ raise RuntimeError(
+ f"Multiple matches for gs://{bucket}/{config_file_path}")
+ return read_gcs_file_if_exists(storage_client,
+ f"gs://{bucket}/{matches[0].name}")
+ return None
+ return read_gcs_file_if_exists(storage_client,
+ f"gs://{bucket}/{config_file_path}")
+
+
+def look_for_config_in_parents(storage_client: storage.Client, gsurl: str,
+ config_filename: str) -> Optional[str]:
+ """look in parent directories for _config/config_filename"""
+ blob: storage.Blob = storage.Blob.from_string(gsurl)
+ bucket_name = blob.bucket.name
+ obj_path = blob.name
+ parts = removesuffix(obj_path, "/").split("/")
+
+ def _get_parent_config(path):
+ return _get_parent_config_file(storage_client, config_filename,
+ bucket_name, path)
+
+ config = None
+ while parts:
+ if config is not None:
+ return config
+ config = _get_parent_config("/".join(parts))
+ parts.pop()
+ return config
+
+
+def construct_load_job_config(storage_client: storage.Client,
+ gsurl: str) -> bigquery.LoadJobConfig:
+ """
+ merge dictionaries for loadjob.json configs in parent directories.
+ The configs closest to gsurl should take precedence.
+ """
+ config_filename = "load.json"
+ blob: storage.Blob = storage.Blob.from_string(gsurl)
+ bucket_name = blob.bucket.name
+ obj_path = blob.name
+ parts = removesuffix(obj_path, "/").split("/")
+
+ def _get_parent_config(path):
+ return _get_parent_config_file(storage_client, config_filename,
+ bucket_name, path)
+
+ config_q: Deque[Dict[str, Any]] = collections.deque()
+ config_q.append(constants.BASE_LOAD_JOB_CONFIG)
+ while parts:
+ config = _get_parent_config("/".join(parts))
+ if config:
+ print(f"found config: {'/'.join(parts)}")
+ config_q.append(json.loads(config))
+ parts.pop()
+
+ merged_config: Dict = {}
+ while config_q:
+ recursive_update(merged_config, config_q.popleft(), in_place=True)
+ if merged_config == constants.BASE_LOAD_JOB_CONFIG:
+ print("falling back to default CSV load job config. "
+ "Did you forget load.json?")
+ return bigquery.LoadJobConfig.from_api_repr(
+ constants.DEFAULT_LOAD_JOB_CONFIG)
+ print(f"merged_config: {merged_config}")
+ return bigquery.LoadJobConfig.from_api_repr({"load": merged_config})
+
+
+def get_batches_for_prefix(
+ gcs_client: storage.Client,
+ prefix_path: str,
+ ignore_subprefix="_config/",
+ ignore_file=constants.SUCCESS_FILENAME) -> List[List[str]]:
+ """
+ This function creates batches of GCS uris for a given prefix.
+ This prefix could be a table prefix or a partition prefix inside a
+ table prefix.
+ returns an Array of their batches
+ (one batch has an array of multiple GCS uris)
+ """
+ batches = []
+ blob: storage.Blob = storage.Blob.from_string(prefix_path)
+ bucket_name = blob.bucket.name
+ prefix_name = blob.name
+
+ bucket = cached_get_bucket(gcs_client, bucket_name)
+ blobs = list(bucket.list_blobs(prefix=prefix_name, delimiter="/"))
+
+ cumulative_bytes = 0
+ max_batch_size = int(
+ os.getenv("MAX_BATCH_BYTES", constants.DEFAULT_MAX_BATCH_BYTES))
+ batch: List[str] = []
+ for blob in blobs:
+ # API returns root prefix also. Which should be ignored.
+ # Similarly, the _SUCCESS file should be ignored.
+ # Finally, anything in the _config/ prefix should be ignored.
+ if (blob.name
+ not in {f"{prefix_name}/", f"{prefix_name}/{ignore_file}"}
+ or blob.name.startswith(f"{prefix_name}/{ignore_subprefix}")):
+ if blob.size == 0: # ignore empty files
+ print(f"ignoring empty file: gs://{bucket}/{blob.name}")
+ continue
+ cumulative_bytes += blob.size
+
+ # keep adding until we reach threshold
+ if cumulative_bytes <= max_batch_size or len(
+ batch) > constants.MAX_SOURCE_URIS_PER_LOAD:
+ batch.append(f"gs://{bucket_name}/{blob.name}")
+ else:
+ batches.append(batch.copy())
+ batch.clear()
+ batch.append(f"gs://{bucket_name}/{blob.name}")
+ cumulative_bytes = blob.size
+
+ # pick up remaining files in the final batch
+ if len(batch) > 0:
+ batches.append(batch.copy())
+ batch.clear()
+
+ if len(batches) > 1:
+ print(f"split into {len(batches)} batches.")
+ elif len(batches) < 1:
+ raise google.api_core.exceptions.NotFound(
+ f"No files to load at {prefix_path}!")
+ return batches
+
+
+def parse_notification(notification: dict) -> Tuple[str, str]:
+ """valdiates notification payload
+ Args:
+ notification(dict): Pub/Sub Storage Notification
+ https://cloud.google.com/storage/docs/pubsub-notifications
+ Or Cloud Functions direct trigger
+ https://cloud.google.com/functions/docs/tutorials/storage
+ with notification schema
+ https://cloud.google.com/storage/docs/json_api/v1/objects#resource
+ Returns:
+ tuple of bucketId and objectId attributes
+ Raises:
+ KeyError if the input notification does not contain the expected
+ attributes.
+ """
+ if notification.get("kind") == "storage#object":
+ # notification is GCS Object reosource from Cloud Functions trigger
+ # https://cloud.google.com/storage/docs/json_api/v1/objects#resource
+ return notification["bucket"], notification["name"]
+ if notification.get("attributes"):
+ # notification is Pub/Sub message.
+ try:
+ attributes = notification["attributes"]
+ return attributes["bucketId"], attributes["objectId"]
+ except KeyError:
+ raise exceptions.UnexpectedTriggerException(
+ "Issue with Pub/Sub message, did not contain expected "
+ f"attributes: 'bucketId' and 'objectId': {notification}"
+ ) from KeyError
+ raise exceptions.UnexpectedTriggerException(
+ "Cloud Function received unexpected trigger: "
+ f"{notification} "
+ "This function only supports direct Cloud Functions "
+ "Background Triggers or Pub/Sub storage notificaitons "
+ "as described in the following links: "
+ "https://cloud.google.com/storage/docs/pubsub-notifications "
+ "https://cloud.google.com/functions/docs/tutorials/storage")
+
+
+def read_gcs_file(gcs_client: storage.Client, gsurl: str) -> str:
+ """
+ Read a GCS object as a string
+
+ Args:
+ gcs_client: GCS client
+ gsurl: GCS URI for object to read in gs://bucket/path/to/object format
+ Returns:
+ str
+ """
+ blob = storage.Blob.from_string(gsurl)
+ return blob.download_as_bytes(client=gcs_client).decode('UTF-8')
+
+
+def read_gcs_file_if_exists(gcs_client: storage.Client,
+ gsurl: str) -> Optional[str]:
+ """return string of gcs object contents or None if the object does not exist
+ """
+ try:
+ return read_gcs_file(gcs_client, gsurl)
+ except google.cloud.exceptions.NotFound:
+ return None
+
+
+# cache lookups against GCS API for 1 second as buckets have update
+# limit of once per second and we might do several of the same lookup during
+# the functions lifetime. This should improve performance by eliminating
+# unnecessary API calls.
+# https://cloud.google.com/storage/quotas
+@cachetools.cached(cachetools.TTLCache(maxsize=1024, ttl=1))
+def cached_get_bucket(
+ gcs_client: storage.Client,
+ bucket_id: str,
+) -> storage.Bucket:
+ """get storage.Bucket object by bucket_id string if exists or raise
+ google.cloud.exceptions.NotFound."""
+ return gcs_client.get_bucket(bucket_id)
+
+
+def dict_to_bq_schema(schema: List[Dict]) -> List[bigquery.SchemaField]:
+ """Converts a list of dicts to list of bigquery.SchemaField for use with
+ bigquery client library. Dicts must contain name and type keys.
+ The dict may optionally contain a mode key."""
+ default_mode = "NULLABLE"
+ return [
+ bigquery.SchemaField(
+ x["name"],
+ x["type"],
+ mode=x.get("mode") if x.get("mode") else default_mode)
+ for x in schema
+ ]
+
+
+# To be added to built in str in python 3.9
+# https://www.python.org/dev/peps/pep-0616/
+def removeprefix(in_str: str, prefix: str) -> str:
+ """remove string prefix"""
+ if in_str.startswith(prefix):
+ return in_str[len(prefix):]
+ return in_str[:]
+
+
+def removesuffix(in_str: str, suffix: str) -> str:
+ """removes suffix from a string."""
+ # suffix='' should not call self[:-0].
+ if suffix and in_str.endswith(suffix):
+ return in_str[:-len(suffix)]
+ return in_str[:]
+
+
+def recursive_update(original: Dict, update: Dict, in_place: bool = False):
+ """
+ return a recursively updated dictionary.
+
+ Note, lists will be completely overwritten by value in update if there is a
+ conflict.
+
+ original: (dict) the base dictionary
+ update: (dict) the dictionary of updates to apply on original
+ in_place: (bool) if true then original will be mutated in place else a new
+ dictionary as a result of the update will be returned.
+ """
+ out = original if in_place else copy.deepcopy(original)
+
+ for key, value in update.items():
+ if isinstance(value, dict):
+ out[key] = recursive_update(out.get(key, {}), value)
+ else:
+ out[key] = value
+ return out
+
+
+def handle_duplicate_notification(
+ gcs_client: storage.Client,
+ blob_to_claim: storage.Blob,
+):
+ """
+ Need to handle potential duplicate Pub/Sub notifications.
+ To achieve this we will drop an empty "claimed" file that indicates
+ an invocation of this cloud function has picked up the success file
+ with a certain creation timestamp. This will support republishing the
+ success file as a mechanism of re-running the ingestion while avoiding
+ duplicate ingestion due to multiple Pub/Sub messages for a success file
+ with the same creation time.
+ """
+ blob_to_claim.reload(client=gcs_client)
+ created_unix_timestamp = blob_to_claim.time_created.timestamp()
+
+ basename = os.path.basename(blob_to_claim.name)
+ claim_blob: storage.Blob = blob_to_claim.bucket.blob(
+ blob_to_claim.name.replace(
+ basename, f"_claimed_{basename}_created_at_"
+ f"{created_unix_timestamp}"))
+ try:
+ claim_blob.upload_from_string("",
+ if_generation_match=0,
+ client=gcs_client)
+ except google.api_core.exceptions.PreconditionFailed as err:
+ blob_to_claim.reload(client=gcs_client)
+ raise exceptions.DuplicateNotificationException(
+ f"gs://{blob_to_claim.bucket.name}/{blob_to_claim.name} appears "
+ "to already have been claimed for created timestamp: "
+ f"{created_unix_timestamp}."
+ "This means that another invocation of this cloud function has "
+ "claimed the work to be one for this file. "
+ "This may be due to a rare duplicate delivery of the Pub/Sub "
+ "storage notification.") from err
+
+
+@cachetools.cached(cachetools.LRUCache(maxsize=1024))
+def get_table_prefix(object_id: str) -> str:
+ """Find the table prefix for a object_id based on the destination regex.
+ Args:
+ object_id: str object ID to parse
+ Returns:
+ str: table prefix
+ """
+ basename = os.path.basename(object_id)
+ if basename in {
+ constants.BACKFILL_FILENAME,
+ constants.START_BACKFILL_FILENAME,
+ "_bqlock",
+ }:
+ # These files will not match the regex and always should appear at the
+ # table level.
+ return removesuffix(object_id, f"/{basename}")
+ match = constants.DESTINATION_REGEX.match(
+ object_id.replace("/_backlog/", "/"))
+ if not match:
+ raise exceptions.DestinationRegexMatchException(
+ f"could not determine table prefix for object id: {object_id}"
+ "because it did not contain a match for destination_regex: "
+ f"{constants.DESTINATION_REGEX.pattern}")
+ table_group_index = match.re.groupindex.get("table")
+ if table_group_index:
+ table_level_index = match.regs[table_group_index][1]
+ return object_id[:table_level_index]
+ raise exceptions.DestinationRegexMatchException(
+ f"could not determine table prefix for object id: {object_id}"
+ "because it did not contain a match for the table capturing group "
+ f"in destination regex: {constants.DESTINATION_REGEX.pattern}")
+
+
+def get_next_backlog_item(
+ gcs_client: storage.Client,
+ bkt: storage.Bucket,
+ table_prefix: str,
+) -> Optional[storage.Blob]:
+ """
+ Get next blob in the backlog if the backlog is not empty.
+
+ Args:
+ gcs_client: storage.Client
+ bkt: storage.Bucket that this cloud functions is ingesting data for.
+ table_prefix: the prefix for the table whose backlog should be checked.
+
+ Retruns:
+ storage.Blob: pointer to a SUCCESS file in the backlog
+ """
+ backlog_blobs = gcs_client.list_blobs(bkt,
+ prefix=f"{table_prefix}/_backlog/")
+ # Backlog items will be lexciographically sorted
+ # https://cloud.google.com/storage/docs/json_api/v1/objects/list
+ for blob in backlog_blobs:
+ return blob # Return first item in iterator
+ return None
+
+
+def remove_oldest_backlog_item(
+ gcs_client: storage.Client,
+ bkt: storage.Bucket,
+ table_prefix: str,
+) -> bool:
+ """
+ Remove the oldest pointer in the backlog if the backlog is not empty.
+
+ Args:
+ gcs_client: storage.Client
+ bkt: storage.Bucket that this cloud functions is ingesting data for.
+ table_prefix: the prefix for the table whose backlog should be checked.
+
+ Returns:
+ bool: True if we removed the oldest blob. False if the backlog was
+ empty.
+ """
+ backlog_blobs = gcs_client.list_blobs(bkt,
+ prefix=f"{table_prefix}/_backlog/")
+ # Backlog items will be lexciographically sorted
+ # https://cloud.google.com/storage/docs/json_api/v1/objects/list
+ blob: storage.Blob
+ for blob in backlog_blobs:
+ blob.delete(client=gcs_client)
+ return True # Return after deleteing first blob in the iterator
+ return False
+
+
+def check_for_bq_job_and_children_errors(bq_client: bigquery.Client,
+ job: Union[bigquery.LoadJob,
+ bigquery.QueryJob]):
+ """checks if BigQuery job (or children jobs in case of multi-statement sql)
+ should be considered failed because there were errors or the query affected
+ no rows while FAIL_ON_ZERO_DML_ROWS_AFFECTED env var is set to True
+ (this is the default).
+
+ Args:
+ bq_client: bigquery.Client
+ job: Union[bigquery.LoadJob, bigquery.QueryJob] job to check for errors.
+ Raises:
+ exceptions.BigQueryJobFailure
+ """
+ if job.state != "DONE":
+ wait_on_bq_job_id(bq_client, job.job_id, 5)
+ if job.errors:
+ raise exceptions.BigQueryJobFailure(
+ f"BigQuery Job {job.job_id} failed during backfill with the "
+ f"following errors: {job.errors} "
+ f"{pprint.pformat(job.to_api_repr())}")
+ if isinstance(job, bigquery.QueryJob):
+ if (constants.FAIL_ON_ZERO_DML_ROWS_AFFECTED
+ and job.statement_type in constants.BQ_DML_STATEMENT_TYPES
+ and job.num_dml_affected_rows < 1):
+ raise exceptions.BigQueryJobFailure(
+ f"query job {job.job_id} ran successfully but did not "
+ f"affect any rows. {pprint.pformat(job.to_api_repr())}")
+ for child_job in bq_client.list_jobs(parent_job=job):
+ check_for_bq_job_and_children_errors(bq_client, child_job)
+
+
+def wait_on_bq_job_id(bq_client: bigquery.Client,
+ job_id: str,
+ polling_timeout: int,
+ polling_interval: int = 1) -> bool:
+ """"
+ Wait for a BigQuery Job ID to complete.
+
+ Args:
+ bq_client: bigquery.Client
+ job_id: str the BQ job ID to wait on
+ polling_timeout: int number of seconds to poll this job ID
+ polling_interval: frequency to query the job state during polling
+ Returns:
+ bool: if the job ID has finished successfully. True if DONE without
+ errors, False if RUNNING or PENDING
+ Raises:
+ exceptions.BigQueryJobFailure if the job failed.
+ google.api_core.exceptions.NotFound if the job id cannot be found.
+ """
+ start_poll = time.monotonic()
+ while time.monotonic() - start_poll < (polling_timeout - polling_interval):
+ job: Union[bigquery.LoadJob,
+ bigquery.QueryJob] = bq_client.get_job(job_id)
+ if job.state == "DONE":
+ check_for_bq_job_and_children_errors(bq_client, job)
+ return True
+ if job.state in {"RUNNING", "PENDING"}:
+ print(f"waiting on BigQuery Job {job.job_id}")
+ time.sleep(polling_interval)
+ print(f"reached polling timeout waiting for bigquery job {job_id}")
+ return False
+
+
+def wait_on_gcs_blob(gcs_client: storage.Client,
+ wait_blob: storage.Blob,
+ polling_timeout: int,
+ polling_interval: int = 1) -> bool:
+ """"
+ Wait for a GCS Object to exists.
+
+ Args:
+ gcs_client: storage.Client
+ wait_blob: storage.Bllob the GCS to wait on.
+ polling_timeout: int number of seconds to poll this job ID
+ polling_interval: frequency to query the job state during polling
+ Returns:
+ bool: if the job ID has finished successfully. True if DONE without
+ errors, False if RUNNING or PENDING
+ Raises:
+ exceptions.BigQueryJobFailure if the job failed.
+ google.api_core.exceptions.NotFound if the job id cannot be found.
+ """
+ start_poll = time.monotonic()
+ while time.monotonic() - start_poll < (polling_timeout - polling_interval):
+ if wait_blob.exists(client=gcs_client):
+ return True
+ print(
+ f"waiting on GCS file gs://{wait_blob.bucket.name}/{wait_blob.name}"
+ )
+ time.sleep(polling_interval)
+ return False
+
+
+def gcs_path_to_table_ref_and_batch(
+ object_id: str, default_project: Optional[str]
+) -> Tuple[bigquery.TableReference, Optional[str]]:
+ """extract bigquery table reference and batch id from gcs object id"""
+
+ destination_match = constants.DESTINATION_REGEX.match(object_id)
+ if not destination_match:
+ raise RuntimeError(f"Object ID {object_id} did not match regex:"
+ f" {constants.DESTINATION_REGEX.pattern}")
+ destination_details = destination_match.groupdict()
+ try:
+ dataset = destination_details['dataset']
+ table = destination_details['table']
+ except KeyError:
+ raise exceptions.DestinationRegexMatchException(
+ f"Object ID {object_id} did not match dataset and table in regex:"
+ f" {constants.DESTINATION_REGEX.pattern}") from KeyError
+ partition = destination_details.get('partition')
+ year, month, day, hour = (
+ destination_details.get(key, "") for key in ('yyyy', 'mm', 'dd', 'hh'))
+ part_list = (year, month, day, hour)
+ if not partition and any(part_list):
+ partition = '$' + ''.join(part_list)
+ batch_id = destination_details.get('batch')
+ labels = constants.DEFAULT_JOB_LABELS
+
+ if batch_id:
+ labels["batch-id"] = batch_id
+
+ if partition:
+
+ dest_table_ref = bigquery.TableReference.from_string(
+ f"{dataset}.{table}{partition}",
+ default_project=os.getenv("BQ_STORAGE_PROJECT", default_project))
+ else:
+ dest_table_ref = bigquery.TableReference.from_string(
+ f"{dataset}.{table}",
+ default_project=os.getenv("BQ_STORAGE_PROJECT", default_project))
+ return dest_table_ref, batch_id
+
+
+def create_job_id(success_file_path):
+ """Create job id prefix with a consistent naming convention based on the
+ success file path to give context of what caused this job to be submitted.
+ the rules for success file name -> job id are:
+ 1. slashes to dashes
+ 2. all non-alphanumeric dash or underscore will be replaced with underscore
+ Note, gcf-ingest- can be overridden with environment variable JOB_PREFIX
+ 3. uuid for uniqueness
+ """
+ clean_job_id = os.getenv('JOB_PREFIX', constants.DEFAULT_JOB_PREFIX)
+ clean_job_id += constants.NON_BQ_JOB_ID_REGEX.sub(
+ '_', success_file_path.replace('/', '-'))
+ # add uniqueness in case we have to "re-process" a success file that is
+ # republished (e.g. to fix a bad batch of data) or handle multiple load jobs
+ # for a single success file.
+ clean_job_id += str(uuid.uuid4())
+ return clean_job_id[:1024] # make sure job id isn't too long
+
+
+def handle_bq_lock(gcs_client: storage.Client, lock_blob: storage.Blob,
+ next_job_id: Optional[str]):
+ """Reclaim the lock blob for the new job id (in-place) or delete the lock
+ blob if next_job_id is None."""
+ try:
+ if next_job_id:
+ if lock_blob.exists(client=gcs_client):
+ lock_blob.upload_from_string(
+ next_job_id,
+ if_generation_match=lock_blob.generation,
+ client=gcs_client)
+ else: # This happens when submitting the first job in the backlog
+ lock_blob.upload_from_string(next_job_id,
+ if_generation_match=0,
+ client=gcs_client)
+ else:
+ print("releasing lock at: "
+ f"gs://{lock_blob.bucket.name}/{lock_blob.name}")
+ lock_blob.delete(
+ if_generation_match=lock_blob.generation,
+ client=gcs_client,
+ )
+ except google.api_core.exceptions.PreconditionFailed as err:
+ raise exceptions.BacklogException(
+ f"The lock at gs://{lock_blob.bucket.name}/{lock_blob.name} "
+ f"was changed by another process.") from err
+
+
+def apply(
+ gcs_client: storage.Client,
+ bq_client: bigquery.Client,
+ success_blob: storage.Blob,
+ lock_blob: Optional[storage.Blob],
+ job_id: str,
+):
+ # pylint: disable=too-many-locals
+ """
+ Apply an incremental batch to the target BigQuery table via an asynchronous
+ load job or external query.
+
+ Args:
+ gcs_client: storage.Client
+ bq_client: bigquery.Client
+ success_blob: storage.Blob the success file whose batch should be
+ applied.
+ lock_blob: storage.Blob _bqlock blob to acquire for this job.
+ job_id: str
+ """
+ handle_duplicate_notification(gcs_client, success_blob)
+ if lock_blob:
+ handle_bq_lock(gcs_client, lock_blob, job_id)
+ bkt = success_blob.bucket
+ dest_table_ref, _ = gcs_path_to_table_ref_and_batch(success_blob.name,
+ bq_client.project)
+ gsurl = removesuffix(f"gs://{bkt.name}/{success_blob.name}",
+ constants.SUCCESS_FILENAME)
+ print(
+ "looking for a transformation tranformation sql file in parent _config."
+ )
+ external_query_sql = look_for_config_in_parents(
+ gcs_client, f"gs://{bkt.name}/{success_blob.name}", '*.sql')
+ try:
+
+ if external_query_sql:
+ print("EXTERNAL QUERY")
+ print(f"found external query: {external_query_sql}")
+ external_query(gcs_client, bq_client, gsurl, external_query_sql,
+ dest_table_ref, job_id)
+ return
+
+ print("LOAD_JOB")
+ load_batches(gcs_client, bq_client, gsurl, dest_table_ref, job_id)
+ return
+
+ except (google.api_core.exceptions.GoogleAPIError,
+ google.api_core.exceptions.ClientError) as err:
+ etype, value, _ = sys.exc_info()
+ msg = (f"failed to submit job {job_id} for {gsurl}: "
+ f"{etype.__class__.__name__}: {value}")
+ blob = storage.Blob.from_string(gsurl)
+ table_prefix = get_table_prefix(blob.name)
+ bqlock = storage.Blob.from_string(
+ f"gs://{blob.bucket.name}/{table_prefix}/_bqlock")
+ # Write this error message to avoid confusion.
+ handle_bq_lock(gcs_client, bqlock, msg)
+ raise exceptions.BigQueryJobFailure(msg) from err
+
+
+def take_table_snapshot(
+ bq_client: bigquery.Client,
+ job_id: str,
+ lock_blob_name: str):
+ """
+ Take a snapshot of the table.
+ We are creating the snapshot in a dataset named _snapshot
+
+ Args:
+ bq_client: bigquery.Client
+ job_id: str
+ lock_blob_name: str the name of the lock_blob
+ """
+ # Create the job
+ # NOTE: This feature is in ALPHA. We have to access the _properties
+ # job_config = bigquery.CopyJobConfig()
+ # job_config._properties["copy"]["operationType"] = "SNAPSHOT" # pylint: disable=W0212
+ print(f"take_table_snapshot: extracting dataset and table name from {lock_blob_name}")
+ destination_match = constants.DESTINATION_REGEX.match(lock_blob_name)
+ if not destination_match:
+ raise RuntimeError(f"Lock Blob Name {lock_blob_name} did not match regex:"
+ f" {constants.DESTINATION_REGEX.pattern}")
+ destination_details = destination_match.groupdict()
+ try:
+ dataset = destination_details['dataset']
+ table = destination_details['table']
+ except KeyError:
+ raise exceptions.DestinationRegexMatchException(
+ f"Lock Blob Name {lock_blob_name} did not match dataset and table in regex:"
+ f" {constants.DESTINATION_REGEX.pattern}") from KeyError
+
+ source_name = f"{dataset}.{table}"
+ # We need to name the snapshot based off of the job id.
+ snapshot_name = f"{constants.SNAPSHOT_DATASET}.{job_id}"
+ print(f"Creating snapshot: {snapshot_name}")
+
+ job = bq_client.copy_table(source_name, snapshot_name)
+ job.result() # Wait for the job to complete.
diff --git a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/main.py b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/main.py
index d05b771db..bf2ccebad 100644
--- a/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/main.py
+++ b/tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/main.py
@@ -1,4 +1,4 @@
-# Copyright 2020 Google LLC.
+# Copyright 2021 Google LLC.
# This software is provided as-is, without warranty or representation
# for any use or purpose.
# Your use of it is subject to your agreement with Google.
@@ -16,536 +16,190 @@
# limitations under the License.
"""Background Cloud Function for loading data from GCS to BigQuery.
"""
-import collections
-import json
+import distutils.util
import os
-import pathlib
-import re
import time
-from typing import Any, Deque, Dict, List, Optional, Tuple
+import traceback
+from typing import Dict, Optional
-import cachetools
-import google.api_core.client_info
-import google.api_core.exceptions
-import google.cloud.exceptions
-from google.cloud import bigquery, storage
+# pylint in cloud build is being flaky about this import discovery.
+# pylint: disable=no-name-in-module
+from google.cloud import bigquery
+from google.cloud import error_reporting
+from google.cloud import storage
-# https://cloud.google.com/bigquery/quotas#load_jobs
-# 15TB per BQ load job (soft limit).
-DEFAULT_MAX_BATCH_BYTES = str(15 * 10**12)
-# 10,000 GCS URIs per BQ load job.
-MAX_SOURCE_URIS_PER_LOAD = 10**4
+try:
+ from common import constants
+ from common import exceptions
+ from common import ordering
+ from common import utils
+except ModuleNotFoundError:
+ from .common import constants
+ from .common import exceptions
+ from .common import ordering
+ from .common import utils
-DEFAULT_EXTERNAL_TABLE_DEFINITION = {
- "sourceFormat": "CSV",
-}
+# Reuse GCP Clients across function invocations using globbals
+# https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
+# pylint: disable=global-statement
-DEFAULT_JOB_LABELS = {
- "component": "event-based-gcs-ingest",
- "cloud-function-name": os.getenv("FUNCTION_NAME"),
-}
+ERROR_REPORTING_CLIENT = None
-BASE_LOAD_JOB_CONFIG = {
- "sourceFormat": "CSV",
- "fieldDelimiter": ",",
- "writeDisposition": "WRITE_APPEND",
- "labels": DEFAULT_JOB_LABELS,
-}
+BQ_CLIENT = None
-# yapf: disable
-DEFAULT_DESTINATION_REGEX = (
- r"^(?P[\w\-\._0-9]+)/" # dataset (required)
- r"(?P