2016-10-03 114 views
1

我正在測試checkpointing,並在下面使用此基本Spark流代碼編寫提前的日誌。我正在檢查本地目錄。在啓動和停止應用程序幾次後(使用Ctrl - C) - 它會拒絕啓動,因爲看起來像檢查點directoty中的某些數據損壞。我越來越:Spark Streaming中的Checkpoint數據損壞

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 80.0 failed 1 times, most recent failure: Lost task 0.0 in stage 80.0 (TID 17, localhost): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) 
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) 
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) 
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) 
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) 

全碼:

import org.apache.hadoop.conf.Configuration 
import org.apache.spark._ 
import org.apache.spark.streaming._ 

object ProtoDemo { 
    def createContext(dirName: String) = { 
    val conf = new SparkConf().setAppName("mything") 
    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") 

    val ssc = new StreamingContext(conf, Seconds(1)) 
    ssc.checkpoint(dirName) 
    val lines = ssc.socketTextStream("127.0.0.1", 9999) 
    val words = lines.flatMap(_.split(" ")) 
    val pairs = words.map(word => (word, 1)) 
    val wordCounts = pairs.reduceByKey(_ + _) 
    val runningCounts = wordCounts.updateStateByKey[Int] { 
     (values: Seq[Int], oldValue: Option[Int]) => 
     val s = values.sum 
     Some(oldValue.fold(s)(_ + s)) 
     } 

    // Print the first ten elements of each RDD generated in this DStream to the console 
    runningCounts.print() 
    ssc 
    } 

    def main(args: Array[String]) = { 
    val hadoopConf = new Configuration() 
    val dirName = "/tmp/chkp" 
    val ssc = StreamingContext.getOrCreate(dirName,() => createContext(dirName), hadoopConf) 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 
+0

嘗試使用可靠的文件系統,如hdfs,看看是否有任何錯誤。 – Knight71

+0

用S3試過了,它仍然發生。我相信腐敗發生在提前寫入日誌中。 – thesamet

+0

你正在使用哪個版本的spark? 1.6還是2.0? –

回答

0

基本上你正在嘗試做的是一個驅動程序故障的情況下,這個工作,基於集羣正在運行,你必須遵循以下說明監視驅動程序進程並在驅動程序失敗時重新啓動驅動程序

配置應用程序驅動程序的自動重新啓動 - 要自動從驅動程序故障中恢復,用於運行流應用程序的部署基礎結構必須監視e驅動程序進程並在驅動程序失敗時重新啓動驅動程序不同的cluster managers有不同的工具來實現這一點。

  1. 星火單機版 - 火花應用程序驅動程序可以(見cluster deploy mode),也就是說,應用驅動器本身的 工作者節點中的一個運行獨立的Spark集羣內提交 運行。此外,獨立集羣管理器可以指示 指示監督驅動程序,並在驅動程序 由於非零退出代碼或由於節點運行驅動程序失敗而導致故障時重新啓動。有關更多詳細信息,請參閱Spark Standalone guide中的集羣模式和監督。

  2. YARN - Yarn支持類似的自動重啓應用程序的機制。更多詳情請參考YARN文檔 。

  3. Mesos - Marathon已被用於Mesos的實現。

您需要配置如下提前寫入日誌,您需要遵循S3的特殊說明。

在使用S3(或不支持任何刷新文件系統)提前寫日誌,請記得啓用

spark.streaming.driver.writeAheadLog.closeFileAfterWrite spark.streaming.receiver.writeAheadLog。 closeFileAfterWrite。

查看Spark Streaming Configuration瞭解更多詳情。

+0

我描述的問題發生在t發生故障後手動重新啓動時。當我重新開始第二次或第三次時,我會得到問題中提到的異常。自動重啓會有什麼不同? – thesamet

+0

你有把這些設置爲true嗎? spark.streaming.driver.writeAheadLog.closeFileAfterWrite spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。 –

+0

是的,使用S3進行測試時仍然存在此問題。他們需要本地文件系統嗎? – thesamet

0

該問題看起來相當Kryo序列化問題比檢查點損壞。 在代碼示例(包括GitHub項目)中,未配置Kryo序列化。 由於沒有配置KryoException異常不會發生。

當使用「預寫日誌」並從目錄中恢復時,所有的Spark配置都從那裏獲取。 以您爲例,createContext方法在從檢查點開始時不會調用。

我假設問題是另一個應用程序之前與相同的檢查點目錄進行了測試,其中配置了Kryo串行器。 當前應用程序無法從該檢查點恢復。