我有我正在嘗試推送到Google BigQuery的日誌。我正在嘗試使用Google數據流構建整個管道。日誌結構不同,可以分爲四種不同的類型。在我的管道中,我從PubSub讀取日誌解析並寫入BigQuery表。日誌需要寫入的表取決於日誌中的一個參數。問題是我卡在一個點上,如何在運行時更改BigQueryIO.Write的TableName。Google數據流根據輸入寫入多個表
回答
您可以使用側面輸出。
https://cloud.google.com/dataflow/model/par-do#emitting-to-side-outputs-in-your-dofn
以下示例代碼,讀取大量查詢表和拆分它在3個不同的PCollections。每個PCollections最終都會發送到不同的發佈/訂閱主題(而不是BigQuery表)。
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
final TupleTag<String> readings2010 = new TupleTag<String>() {
};
final TupleTag<String> readings2000plus = new TupleTag<String>() {
};
final TupleTag<String> readingsOld = new TupleTag<String>() {
};
PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string")
.withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld))
.of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
if (c.element().getF().get(2).getV().equals("2010")) {
c.output(c.element().toString());
} else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) {
c.sideOutput(readings2000plus, c.element().toString());
} else {
c.sideOutput(readingsOld, c.element().toString());
}
}
}));
collectionTuple.get(readings2010)
.apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1"));
collectionTuple.get(readings2000plus)
.apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2"));
collectionTuple.get(readingsOld)
.apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3"));
p.run();
即使有側輸出,在流水線運行之前,即在編譯時定義流水線時,是否仍然需要定義接收器名稱(例如BigQueryIO)?我的理解是,OP想要動態地寫入BigQuery表 - 其名稱僅在運行時才知道。我不明白這是可能的。也許我錯過了一些明顯的東西! –
哦,對。想知道「日誌需要寫入的表取決於日誌中的一個參數」意味着名稱需要是動態的或者是已知集合中的一個。希望@nikhil sharma可以發表評論。 –
我認爲這是一個集合,因爲「日誌結構不同,可以分爲四種不同的類型。」 - 這樣可以讓他們將日誌分成4個不同的表格。 –
- 1. 根據輸入自動填寫表格?
- 2. 根據用戶輸入填寫數據表
- 3. 如何強制流寫入數字數據根據類型
- 4. Google雲數據流寫入數據問題(TextIO或DatastoreIO)
- 5. 使用Google雲數據流從一個數據存儲中讀取數據並寫入另一個數據流
- 6. 如何用輸入框輸入數據到多個表格
- 7. PLSQL根據輸入
- 8. 從輸入中讀取數據在Smalltalk中逐行寫入流
- 9. 根據輸入值更新數據庫
- 10. 如何從多線程寫入流數據到Azure數據湖?
- 11. 如何根據數字寫入多個數組?
- 12. 根據用戶輸入在輸入字段中顯示數據
- 13. 輸入的數據未寫入文件
- 14. (Excel)數據輸入表格將數據輸入到表格
- 15. 根據用戶輸入創建多個表單項目
- 16. 從數據輸入流中讀取響應後,我可以寫入數據輸出流嗎?
- 17. 基於1行輸入生成多行數據 - Google表格
- 18. 是否有使用單個流水傳輸流將數據寫入多個Redshift表的問題
- 19. 多個數據流不能正確寫入
- 20. 根據表中實例的數量限制數據輸入
- 21. 將多個熊貓數據框寫入多個excel工作表
- 22. Laravel插入多個輸入表單數據庫
- 23. 根據用戶輸入通過AJAX請求重繪google圖表
- 24. Django multidb:寫入多個數據庫
- 25. 根據輸入更改表格
- 26. PHP表單數據寫入
- 27. Apache Spark - shuffle寫入比輸入數據大小更多的數據
- 28. 根據輸入檢索行
- 29. 根據給定的輸入
- 30. Spring MVC的:根據輸入
類似於:http://stackoverflow.com/questions/30431840/writing-results-of-google-dataflow-pipeline-into-mulitple-sinks,請檢查出來,它可以幫助你。 – robosoul
@nikhil sharma - 你真的需要自己手動推出這個解決方案嗎?你有沒有看過使用類似Fluentd的東西? http://www.fluentd.org/ –
@格拉漢姆非常感謝你的回答。我需要完全像http://stackoverflow.com/questions/35979421/dynamic-table-name-when-writing-to-bq-from-dataflow-pipelines,但我很抱歉知道這在數據流中不受支持。我還沒有試過Fluentd,我們試圖在數據流和其他方面實現管道。你能告訴我谷歌有沒有計劃將這個特性包括在數據流中,因爲這對我們所有的管道來說都是非常需要的。另外,如果我可以通過實現這個功能來貢獻,我會更樂意去做。請告訴我,如果我可以。 –