我們有一個非常簡單的管道,它正在讀取GCS,執行一個簡單的ParDo
,然後將結果寫入BigQuery。它可以自動擴展到50臺虛擬機,運行在GCP上,並且不會做任何事情。爲什麼從Dataflow/Beam管道寫入BigQuery很慢?
從GCS(〜10B記錄&〜700 + GB)讀取所有數據並進行轉換,所有數據都發生得相對較快(前7-10分鐘)。
但是,當它到達BigQuery寫入(使用BigQueryIO
)時,它會放慢速度 - 即使它只需要寫入大約1M個記錄(〜60MB)。這一步僅需20米。
除了緩慢寫入BigQuery之外,圖表顯示該步驟即使成功(儘管速度非常慢)也會「停止」。這一步看起來過於複雜,只是簡單地寫入BigQuery(見下圖)。
瓶頸似乎是當它到達步驟Executing operation BigQueryIO.Write/BatchLoads/WriteRename
(請參閱下面的日誌)。
在我的代碼中有什麼我做錯了嗎?
代碼:
public class Pipeline {
private static final String BIG_QUERY_TABLE = "<redacted>:<redacted>.melbourne_titles";
private static final String BUCKET = "gs://<redacted>/*.gz";
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(DataflowPipelineOptions.class);
options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.read().from(BUCKET).withCompressionType(GZIP))
.apply(ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String input = c.element();
String title = input.split(",")[5];
if (title.toLowerCase().contains("melbourne")) {
TableRow tableRow = new TableRow();
tableRow.set("title", title);
c.output(tableRow);
}
}
}))
.apply(BigQueryIO.writeTableRows()
.to(BIG_QUERY_TABLE)
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_TRUNCATE)
.withSchema(getSchema()));
pipeline.run();
}
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("title").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
}
日誌片斷:
2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Create
2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
2017-08-25 (21:30:23) Starting 50 workers in australia-southeast1-a...
2017-08-25 (21:30:23) Executing operation TextIO.Read/Read+ParDo(Anonymous)+BigQueryIO.Write/PrepareWrite/ParDo(Anonymous)...
2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TriggerIdCreation/Read(CreateSource)+BigQueryIO.Writ...
2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoad...
2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
2017-08-25 (21:31:22) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
2017-08-25 (21:31:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
2017-08-25 (21:38:10) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/CreateDataflowView
2017-08-25 (21:38:13) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/CreateDataflowView
2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Close
2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Read+BigQueryIO.Write/BatchLoads/GroupByK...
2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Create
2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Create
2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Close
2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Close
2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Read+BigQueryIO...
2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Read+BigQueryIO...
2017-08-25 (21:39:00) Executing operation s35-u80
2017-08-25 (21:39:01) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/Flatten.PCollections
2017-08-25 (21:39:03) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/CreateDataflowView
2017-08-25 (21:39:06) Executing operation BigQueryIO.Write/BatchLoads/ResultsView/CreateDataflowView
2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Create
2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Create
2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/Create.Values/Read(CreateSource)+BigQueryIO.Write/Ba...
2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Close
2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Close
2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read+BigQueryIO...
2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Read+BigQueryIO....
2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/CreateDataflowView
2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/WriteRename
2017-08-25 (21:57:43) Stopping worker pool...
過於複雜的看步:
作業詳細信息:
- 數據流的Java SDK:2.0.0
- 職位編號:2017-08-25_04_29_54-7210937293145071720
UPDATE
我覺得問題是Dataflow正在生成的文件的數量,以及隨後的BigQuery必須加載的文件數量。它可能只是1M行,但數據流的文件生產850+加載:
"configuration" : {
"load" : {
"createDisposition" : "CREATE_IF_NEEDED",
"destinationTable" : {
"datasetId" : "dataflow_on_a_tram",
"projectId" : "grey-sort-challenge",
"tableId" : "melbourne_titles"
},
"schema" : {
"fields" : [ {
"name" : "year",
"type" : "STRING"
}, {
"name" : "month",
"type" : "STRING"
}, {
"name" : "day",
"type" : "STRING"
}, {
"name" : "wikimedia_project",
"type" : "STRING"
}, {
"name" : "language",
"type" : "STRING"
}, {
"name" : "title",
"type" : "STRING"
}, {
"name" : "views",
"type" : "INTEGER"
} ]
},
"sourceFormat" : "NEWLINE_DELIMITED_JSON",
"sourceUris" : [
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/51221a43-8fd8-417d-90ca-2f3c3e5789d2",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/5e1c3cb8-20d1-45ef-b0bb-209645c36093",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0ed8d240-2bc2-4c8b-808d-792540448c73",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/d7a1fefe-6dd8-4f30-bf97-040c3692e448",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/b7c4d9a8-d45d-4cc6-b375-291e6435ed53",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/17a7bbf4-5695-4188-b03a-3ef5cda8607c",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/783af461-c114-4a41-aa5f-ed1c7db86bab",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/dad046fc-eabf-4212-83f1-7d7fa71075c1",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/7b9ffec1-7424-4248-83b4-98a4ef4233b9",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/bb297232-8e84-4a14-9dc6-3efde1b2b586",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0693972a-1319-4637-af9f-8a4a3d5cb0f7",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/41b1e722-f76c-404d-a71b-bd36c09e8a06",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/19cfd89e-c9ee-4221-aee1-b3503dbcd93b",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/574467f2-5771-479a-b213-2941225a24bd",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/4d872304-0f42-47f2-89cf-b3a3f856ca67",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/1c086246-8eec-4bbe-be98-b01abb181d33",
"gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/9439f5f4-5020-471d-b631-e1a3fea1584f",
[..]851 files!
看着工作人員的日誌,似乎它只是在等待BQ加載作業完成。 BQ的工作可能比平時慢。這個問題通常嗎? – Pablo
我再次運行它。同樣的問題。 2017-08-25_14_26_18-5377718284053913263 –
@Pablo - 查看我的更新。我認爲這很慢,因爲Dataflow正在爲BQ加載很多文件。 –