我們正在構建簡單的Streaming應用程序,該應用程序使用HBase RDD與傳入的DStream進行連接。 示例代碼:Apache Spark:從檢查點恢復狀態期間的NPE
val indexState = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]).map { case (rowkey, v) => //some logic}
val result = dStream.transform { rdd =>
rdd.leftOuterJoin(indexState)
}
它工作正常,但是當我們啓用檢查點對的StreamingContext ,讓應用程序從先前創建的檢查點恢復, 它總是拋出NullPointerException異常。
ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.NullPointerException
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
有沒有人遇到同樣的問題? 版本:
- 星火1.6.x的
- Hadoop的2.7.x
謝謝!
當你說「以前創建檢查點」意思流作業停止並重新提交? – ImDarrenG