我寫了有狀態轉換的火花流媒體程序。 看起來像我的火花流應用程序正在做計算與檢查指向正確。 但是,如果我終止我的程序,並且如果我再次啓動它,它不會讀取以前的檢查點數據並從一開始就盯着。這是預期的行爲嗎?火花流不記得以前的狀態
我是否需要更改我的程序中的任何內容,以便它會記住以前的數據並從那裏開始計算?
在此先感謝。
僅供參考我的程序:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("hdfs://<hostname1>:8020/user/spark/checkpoints_dir")
inputStream.print(1)
val parsedStream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
})
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
state.checkpoint(Duration(10000))
state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
謝謝我能夠做到。它現在記得以前的狀態。 – Vibhuti