2016-02-27 76 views
1

我寫了有狀態轉換的火花流媒體程序。 看起來像我的火花流應用程序正在做計算與檢查指向正確。 但是,如果我終止我的程序,並且如果我再次啓動它,它不會讀取以前的檢查點數據並從一開始就盯着。這是預期的行爲嗎?火花流不記得以前的狀態

我是否需要更改我的程序中的任何內容,以便它會記住以前的數據並從那裏開始計算?

在此先感謝。

僅供參考我的程序:

def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("HBaseStream") 
    val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(sc, Seconds(5)) 
    val inputStream = ssc.socketTextStream(<hostname>, 9999) 
    ssc.checkpoint("hdfs://<hostname1>:8020/user/spark/checkpoints_dir") 
    inputStream.print(1) 
    val parsedStream = inputStream 
     .map(line => { 
     val splitLines = line.split(",") 
     (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) 
     }) 
    import breeze.linalg.{DenseVector => BDV} 
    import scala.util.Try 

    val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
     (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { 
     prev.map(_ +: current).orElse(Some(current)) 
      .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) 
     }) 
    state.checkpoint(Duration(10000)) 
    state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) 

    // Start the computation 
    ssc.start() 
    // Wait for the computation to terminate 
    ssc.awaitTermination() 

    } 
} 

回答

2

據火花流媒體文件,你應該初始化方面有點不同:

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

// Do additional setup on context that needs to be done, 
// irrespective of whether it is being started or restarted 
context. ... 

// Start the context 
context.start() 
context.awaitTermination() 

看到checkpointing

1

因爲它是在checkpointing documentation描述你必須調整你的代碼才能從檢查點恢復狀態。

特別是你不能直接創建StreamingContext,但必須使用StreamingContext.getOrCreate方法,這需要:

  • 檢查點目錄,可用於建立上下文
  • 功能(Unit => StreamingContext
+0

謝謝我能夠做到。它現在記得以前的狀態。 – Vibhuti