我試圖建立在谷歌雲數據流管道,會做下列文件閱讀:谷歌雲數據流:從動態文件名
- 聽取了關於發佈訂閱訂閱事件
- 提取從文件名事件文本
- 讀取文件(從谷歌Cloud Storage桶)
- 商店BigQuery中
記錄以下是代碼:
Pipeline pipeline = //create pipeline
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub"))
.apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>)
.apply(TextIO.read().from(""))???
我在第三步掙扎,不太確定如何訪問第二步的輸出並在第三步中使用它。我曾嘗試編寫產生以下代碼的代碼:
private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){
//A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method
}
但是,我無法在後續步驟中讀取文件內容。
任何人都可以請我知道我需要在第3步和第4步寫什麼,以便我可以逐行使用文件並將輸出存儲到BigQuery(或者只是記錄它)。
您引用的方法不存在:TextIO.read()。from()僅適用於String或ValueProvider。您可能會將其與write()中更動態的方法混淆。 –
jkff