2017-09-14 97 views
7

以下是方面一些要點我怎麼會有事的設置:AWS Glue to Redshift:是否可以替換,更新或刪除數據?

  • 我已經上傳到S3的CSV文件和膠履帶設置來創建表和模式。
  • 我有一個膠水作業設置,它使用JDBC連接將膠水錶中的數據寫入Amazon Redshift數據庫。該工作還負責映射列和創建紅移表。

通過重新運行一項工作,我得到了redshift中的重複行(如預期的那樣)。但是,在插入新數據之前,是否有辦法替換或刪除行,使用密鑰或膠水中的分區設置?

import sys 
from awsglue.transforms import * 
from awsglue.utils import getResolvedOptions 
from pyspark.context import SparkContext 
from awsglue.context import GlueContext 
from awsglue.job import Job 

from awsglue.dynamicframe import DynamicFrame 
from awsglue.transforms import SelectFields 

from pyspark.sql.functions import lit 

## @params: [TempDir, JOB_NAME] 
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) 

sc = SparkContext() 
glueContext = GlueContext(sc) 
spark = glueContext.spark_session 
job = Job(glueContext) 
job.init(args['JOB_NAME'], args) 

columnMapping = [ 
    ("id", "int", "id", "int"), 
    ("name", "string", "name", "string"), 
] 

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0") 

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1") 
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1") 
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1") 
df1 = dropnullfields1.toDF() 
data1 = df1.withColumn('platform', lit('test')) 
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1") 

## Write data to redshift 
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1") 

job.commit() 
+1

好問題,現在遇到同樣的問題。你到目前爲止取得了哪些進展? – Matthijs

+0

我與AWS Glue支持部門聯繫並能夠解決問題。它沒有出現膠水有辦法做到這一點,或從來沒有爲這種類型的工作。我能夠得到一個工作解決方案的方式是讓膠水將所有行插入暫存表中,然後在膠水外執行上/合併。 – krchun

回答

1

這是我從AWS膠水支持得到了解決:

正如你可能知道,雖然你可以創建主鍵,紅移不強制唯一性。因此,如果您重新運行粘合作業,則可以插入重複的行。一些以保持唯一性的方式是:

  1. 使用一個臨時表中插入所有的行,然後執行UPSERT /合併[1]到主表,這有膠以外的地方進行。

  2. 在你的redshift表[1]中添加另一列,就像插入時間戳一樣,以允許重複,但要知道哪一個先出現或最後出現,然後在需要時刪除重複。

  3. 裝入先前插入數據到數據幀,然後比較的數據被插入,以避免插入重複[3]

[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.htmlhttp://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/

[2] - https://github.com/databricks/spark-redshift/issues/238

[3] - 在膠SH https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

+0

您是否查看作業書籤?如果您的源代碼是S3,則可能就足夠了。如果它不適合你,我想知道你遇到了什麼問題,所以我不會犯同樣的錯誤? – Matthijs

3

工作書籤是關鍵。只需編輯作業並啓用「作業書籤」,它不會處理已處理的數據。 請注意,該作業必須重新運行一次,才能檢測到該作業不必重新處理舊數據。

欲瞭解更多信息,請參閱: http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

命名爲「書籤」是有點牽強在我看來。如果我在搜尋過程中偶然偶然發現它,我從來沒有看過它。

+0

我不確定你爲什麼被拒絕投票。作業書籤與火花中的檢查點相當,聽起來就像是問題所在。 –

+1

我也不知道。我能想到的唯一原因是重新運行相同的工作(例如通過清除書籤)可能會導致Redshift中的雙記錄,因爲批處理會再次處理。 – Matthijs

+2

你真的有工作嗎?我知道它應該做你說的,但我無法得到它的工作。我有一個目錄表作爲輸入(由爬蟲通過S3中的Parquet數據集創建),一個簡單的映射步驟和Redshift作爲數據接收器。作業書籤默認啓用,所有作業運行也啓用。仍然重複每次運行的所有數據。 – andresp

相關問題