0
我有一個Dataflow管道,在本地運行。目標是使用TEXTIO讀取JSON文件,創建會話並將其加載到BigQuery中。鑑於結構,我必須在GCS中創建一個臨時目錄,然後使用它將其加載到BigQuery中。以前我有一個數據模式錯誤,導致我無法加載數據,請參閱here。該問題已解決。BigQuery手動加載,但不通過Java SDK
因此,現在當我在本地運行管道時,它將臨時JSON換行符分隔的文件轉儲到GCS中。該SDK然後給我下面的:
Starting BigQuery load job beam_job_xxxx_00001-1: try 1/3
INFO [main] (BigQueryIO.java:2191) - BigQuery load job failed: beam_job_xxxx_00001-1
...
Exception in thread "main" com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:187)
at pedesys.Dataflow.main(Dataflow.java:148)
Caused by: java.lang.RuntimeException: Failed to create the load job beam_job_xxxx_00001, reached max retries: 3
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.load(BigQueryIO.java:2198)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$WriteTables.processElement(BigQueryIO.java:2146)
的錯誤是不是很描述性數據仍然沒有在BigQuery中加載。令人費解的是,如果我去BigQuery UI並手動加載由SDK的Dataflow管道轉儲的GCS中相同的臨時文件,則在同一個表中,它可以很好地工作。
相關的部分代碼如下:
PipelineOptions options = PipelineOptionsFactory.create();
options.as(BigQueryOptions.class)
.setTempLocation("gs://test/temp");
Pipeline p = Pipeline.create(options)
...
...
session_windowed_items.apply(ParDo.of(new FormatAsTableRowFn()))
.apply(BigQueryIO.Write
.named("loadJob")
.to("myproject:db.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
要做的第一件事是找出錯誤實際是什麼。以BigQuery加載作業ID(作業beam_job_ _00001-1),並從命令行('bq show -j job beam_job_ _00001-1')或通過瀏覽器通過_「試一試「_位於頁面底部(https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)。然後你會知道更多的細節。 –
我提交了https://issues.apache.org/jira/browse/BEAM-1235以改進錯誤報告。 – jkff
謝謝@jkff,這將是非常有益的,並且將首先阻止這一切:) – plumSemPy