0

我試圖建立在谷歌雲數據流管道,會做下列文件閱讀:谷歌雲數據流:從動態文件名

  • 聽取了關於發佈訂閱訂閱事件
  • 提取從文件名事件文本
  • 讀取文件(從谷歌Cloud Storage桶)
  • 商店BigQuery中

記錄以下是代碼:

Pipeline pipeline = //create pipeline 
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub")) 
     .apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>) 
     .apply(TextIO.read().from(""))??? 

我在第三步掙扎,不太確定如何訪問第二步的輸出並在第三步中使用它。我曾嘗試編寫產生以下代碼的代碼:

private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){ 
    //A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method 
} 

但是,我無法在後續步驟中讀取文件內容。

任何人都可以請我知道我需要在第3步和第4步寫什麼,以便我可以逐行使用文件並將輸出存儲到BigQuery(或者只是記錄它)。

回答

2

表達您閱讀的自然方式是使用TextIO.readAll()方法,該方法從文件名的輸入PCollection中讀取文本文件。該方法已在Beam代碼庫中引入,但目前尚未發佈。它將包含在Beam 2.2.0發行版和相應的Dataflow 2.2.0發行版中。

-1

您可以使用SerializableFunction完成此操作。

你可以做

pipeline.apply(TextIO.read().from(new FileNameFn())); 

public class FileNameFn implements SerializableFunction<inputFileNameString, outputQualifiedFileNameStringWithBucket> 

顯然,你可以傳遞水桶名稱和其他參數的靜態同時通過構造函數的參數創建該類的實例。

希望這會有所幫助。

+0

您引用的方法不存在:TextIO.read()。from()僅適用於String或ValueProvider 。您可能會將其與write()中更動態的方法混淆。 – jkff

相關問題