0
比方說,我有兩個不同類型的DataStream
的:如何寫多個Datastream的單個文件
val stream1: DataStream[(Int, Int, Int)] = ...
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ...
我怎麼能寫兩個流成單個文件?
我試過不同的事情,但似乎沒有工作。舉例來說,我不能只是寫直線距離爲
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt").setParallelism(1)
因爲弗林克將與以下消息抱怨:
java.io.IOException: File or directory already exists.
Existing files and directories are not overwritten in NO_OVERWRITE mode.
Use OVERWRITE mode to overwrite existing files and directories.
在另一方面,我不能覆蓋這樣的:
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1)
因爲(據我所知)第二個流將覆蓋第一個流寫入的內容。
最後,我想到了連接流這樣
val connectedStream: ConnectedStream = stream1.connect(stream2)
但我會得到一個ConnectedStream
,它不具有writeAsText
方法。 (爲了記錄,我實際上有4個流,我想要寫入一個文件)。
謝謝!但是我怎麼能把這個解決方案擴展到4個流?如果我加入2個流,我會得到一個'JoinedStream',它沒有'join'方法加入其他2個流的「鏈」... – houcros
我說'union'不是'join'。這些是不同的操作。我在我的答案中添加了示例代碼。 –
哦,對不起,你是對的......我不知道你爲什麼說「加入」...... – houcros