2017-02-16 57 views
1

我們正在構建簡單的Streaming應用程序,該應用程序使用HBase RDD與傳入的DStream進行連接。 示例代碼:Apache Spark:從檢查點恢復狀態期間的NPE

val indexState = sc.newAPIHadoopRDD(
    conf, 
    classOf[TableInputFormat], 
    classOf[ImmutableBytesWritable], 
    classOf[Result]).map { case (rowkey, v) => //some logic} 

val result = dStream.transform { rdd => 
    rdd.leftOuterJoin(indexState) 
} 

它工作正常,但是當我們啓用檢查點對的StreamingContext ,讓應用程序從先前創建的檢查點恢復, 它總是拋出NullPointerException異常。

ERROR streaming.StreamingContext: Error starting the context, marking it as stopped 
java.lang.NullPointerException 
     at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119) 
     at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 

有沒有人遇到同樣的問題? 版本:

  • 星火1.6.x的
  • Hadoop的2.7.x

謝謝!

+0

當你說「以前創建檢查點」意思流作業停止並重新提交? – ImDarrenG

回答

1

Spark Streaming檢查點不能用於從以前的作業中恢復,至少在1.6.x版本中是如此。如果您的作業停止並重新提交,則檢查點數據不能重新使用。在提交作業之前,您必須刪除舊的檢查點數據。

[R]從升級前代碼的早期檢查點信息中刪除無法完成。檢查點信息本質上包含序列化的Scala/Java/Python對象,並試圖用新的修改後的類對對象進行反序列化可能會導致錯誤。在這種情況下,可以使用不同的檢查點目錄啓動升級的應用程序,或者刪除以前的檢查點目錄。

Upgrading the code - checkpointing

+0

這是否意味着檢查點僅適用於dstream,並且在與任何一方rdd一起工作時我們不能使用它們? –

+0

您的使用狀況良好,但檢查點允許驅動程序恢復,但不支持通過spark-submit停止並啓動整個流式作業。 – ImDarrenG

+0

*我發現在沒有任何代碼更改的情況下重新啓動流式作業時也保持相同*這是不正確的。只要沒有更改,使用現有數據重新啓動失敗作業就沒有問題。 –