2017-03-09 70 views
0

我想使用結構化流式傳輸將文本文件附加到文本文件中。此代碼導致SparkException:任務不可序列化。我認爲toDF是不允許的。我怎麼能得到這個代碼工作?使用foreach附加到文本文件的Spark結構化流式傳輸

df.writeStream 
    .foreach(new ForeachWriter[Row] { 
    override def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 

    override def process(row: Row): Unit = { 
     val df = Seq(row.getString(0)).toDF 

     df.write.format("text").mode("append").save(output) 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
    }  
    }).start 
+0

我想你應該試試這個 df.write.mode(SaveMode.Append).textFile(「Path_To_Store」) –

+0

編譯器抱怨文本文件不是成員DataFramerWriter。我正在使用spark 2.1。 –

+0

對不起,它只有'文本(「Path_To_Save」)' 'df.write.mode(SaveMode.Append).text(「Path_To_Save」)' –

回答

1

不能調用df.write.format("text").mode("append").save(output)process方法。它將運行在執行者一方。您可以使用文件接收器代替,如

df.writeStream.format("text").... 
+0

雖然我打算改變foreach代碼中的輸出路徑,這取決於行中的內容。我需要找到一種方法來做到這一點。 –

+0

不確定您是否可以使用'DataStreamWriter.partitionBy' – zsxwing

+0

美麗,是的。非常感謝你。 –