2016-08-30 82 views
0

比方說,我有兩個不同類型的DataStream的:如何寫多個Datastream的單個文件

val stream1: DataStream[(Int, Int, Int)] = ... 
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ... 

我怎麼能寫兩個流單個文件

我試過不同的事情,但似乎沒有工作。舉例來說,我不能只是寫直線距離爲

stream1.writeAsText("path/to/file.txt").setParallelism(1) 
stream2.writeAsText("path/to/file.txt").setParallelism(1) 

因爲弗林克將與以下消息抱怨:

java.io.IOException: File or directory already exists. 
Existing files and directories are not overwritten in NO_OVERWRITE mode. 
Use OVERWRITE mode to overwrite existing files and directories. 

在另一方面,我不能覆蓋這樣的:

stream1.writeAsText("path/to/file.txt").setParallelism(1) 
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1) 

因爲(據我所知)第二個流將覆蓋第一個流寫入的內容。

最後,我想到了連接流這樣

val connectedStream: ConnectedStream = stream1.connect(stream2) 

但我會得到一個ConnectedStream,它不具有writeAsText方法。 (爲了記錄,我實際上有4個流,我想要寫入一個文件)。

回答

0

一個非常簡單的解決方案是使用映射器將每個事件映射到String(或另一種常見類型,如byte[])。然後,您有四個相同類型的流(DataStream[String]),您可以將它們合併爲一個流並將其作爲一個流寫入文件。

這將如下所示:

val s1: DataStream[String] = ??? 
val s2: DataStream[String] = ??? 
val s3: DataStream[String] = ??? 
val s4: DataStream[String] = ??? 

val out: DataStream[String] = s1.union(s2).union(s3).union(s4) 
out.writeAsText("path/to/file") 
+0

謝謝!但是我怎麼能把這個解決方案擴展到4個流?如果我加入2個流,我會得到一個'JoinedStream',它沒有'join'方法加入其他2個流的「鏈」... – houcros

+0

我說'union'不是'join'。這些是不同的操作。我在我的答案中添加了示例代碼。 –

+0

哦,對不起,你是對的......我不知道你爲什麼說「加入」...... – houcros

相關問題