作者:他妈的碧海连天 | 来源:互联网 | 2022-12-07 00:38
BigQuery支持以下策略:
WRITE_APPEND
-指定可以将行追加到现有表中。
WRITE_EMPTY
-指定输出表必须为空。
WRITE_TRUNCATE
-指定写应替换表。
它们都不适合UPSERT
操作目的。
我正在将订单Json文件导入Google Storage,并希望将其加载到BigQuery中。逻辑提示,某些记录将是新记录,而其他记录已从以前的装载中获取并且需要更新(例如,更新订单状态(新/处于保留状态/已发送/退款等...)
我正在使用Airflow,但我的问题很普遍:
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_orders_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='NEWLINE_DELIMITED_JSON',
source_objects=[gcs_export_uri_template],
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
此代码使用表示WRITE_TRUNCATE
这意味着删除整个表并加载请求的文件。
我如何修改它以提供支持UPSERT
?
我唯一的选择是查询表搜索以找到json中出现的现有订单LOAD
吗?删除它们,然后执行?
1> Felipe Hoffa..:
除了运行之外GoogleCloudStorageToBigQueryOperator
,您还可以运行一个查询,该查询将为您提供与upsert相同的结果。
来自https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement的示例:
MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
INSERT (product, quantity) VALUES(product, quantity)
该查询将:
看一下表T(当前)和S(更新)。
如果更新更改了现有行,它将UPDATE
在该行上运行。
如果更新的产品尚不存在,它将更新INSERT
该行。
现在,BigQuery将如何知道您的表S
?您可以:
使用将其加载到BQ到另一个表中GoogleCloudStorageToBigQueryOperator
。
或者,您可以设置一个直接进入GCS的联合表-我在https://medium.com/google-cloud/bigquery-lazy-data-loading-ddl-dml-partitions-and-half-a-万亿-Wikipedia-pageviews-cd3eacd657b6