2

我有我正在嘗試推送到Google BigQuery的日誌。我正在嘗試使用Google數據流構建整個管道。日誌結構不同,可以分爲四種不同的類型。在我的管道中,我從PubSub讀取日誌解析並寫入BigQuery表。日誌需要寫入的表取決於日誌中的一個參數。問題是我卡在一個點上,如何在運行時更改BigQueryIO.Write的TableName。Google數據流根據輸入寫入多個表

+0

類似於:http://stackoverflow.com/questions/30431840/writing-results-of-google-dataflow-pipeline-into-mulitple-sinks,請檢查出來,它可以幫助你。 – robosoul

+0

@nikhil sharma - 你真的需要自己手動推出這個解決方案嗎?你有沒有看過使用類似Fluentd的東西? http://www.fluentd.org/ –

+0

@格拉漢姆非常感謝你的回答。我需要完全像http://stackoverflow.com/questions/35979421/dynamic-table-name-when-writing-to-bq-from-dataflow-pipelines,但我很抱歉知道這在數據流中不受支持。我還沒有試過Fluentd,我們試圖在數據流和其他方面實現管道。你能告訴我谷歌有沒有計劃將這個特性包括在數據流中,因爲這對我們所有的管道來說都是非常需要的。另外,如果我可以通過實現這個功能來貢獻,我會更樂意去做。請告訴我,如果我可以。 –

回答

3

您可以使用側面輸出。

https://cloud.google.com/dataflow/model/par-do#emitting-to-side-outputs-in-your-dofn

以下示例代碼,讀取大量查詢表和拆分它在3個不同的PCollections。每個PCollections最終都會發送到不同的發佈/訂閱主題(而不是BigQuery表)。

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); 

PCollection<TableRow> weatherData = p.apply(
     BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations")); 

final TupleTag<String> readings2010 = new TupleTag<String>() { 
}; 
final TupleTag<String> readings2000plus = new TupleTag<String>() { 
}; 
final TupleTag<String> readingsOld = new TupleTag<String>() { 
}; 

PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string") 
     .withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld)) 
     .of(new DoFn<TableRow, String>() { 
      @Override 
      public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception { 

       if (c.element().getF().get(2).getV().equals("2010")) { 
        c.output(c.element().toString()); 
       } else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) { 
        c.sideOutput(readings2000plus, c.element().toString()); 
       } else { 
        c.sideOutput(readingsOld, c.element().toString()); 
       } 

      } 
     })); 
collectionTuple.get(readings2010) 
     .apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1")); 
collectionTuple.get(readings2000plus) 
     .apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2")); 
collectionTuple.get(readingsOld) 
     .apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3")); 

p.run(); 
+0

即使有側輸出,在流水線運行之前,即在編譯時定義流水線時,是否仍然需要定義接收器名稱(例如BigQueryIO)?我的理解是,OP想要動態地寫入BigQuery表 - 其名稱僅在運行時才知道。我不明白這是可能的。也許我錯過了一些明顯的東西! –

+0

哦,對。想知道「日誌需要寫入的表取決於日誌中的一個參數」意味着名稱需要是動態的或者是已知集合中的一個。希望@nikhil sharma可以發表評論。 –

+1

我認爲這是一個集合,因爲「日誌結構不同,可以分爲四種不同的類型。」 - 這樣可以讓他們將日誌分成4個不同的表格。 –