2015-06-27 140 views
1

加盟火花RDD的需要執行以下連接操作火花需要幫助的在Java

JavaPairRDD<String, Tuple2<Optional<MarkToMarketPNL>, Optional<MarkToMarketPNL>>> finalMTMPNLRDD = openMTMPNL.fullOuterJoin(closedMTMPNL); 

要執行此操作我需要兩個JavaPairRDD這是closedMTMPNL和openMTMPNL。 OpenMTM和closeMTM工作正常,但兩個RDD上的keyBy都在運行時給出錯誤。

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){ 
       public String call(MarkToMarketPNL mtm) throws Exception 
       { 
         return mtm.getTaxlot(); 
       } 
      }); 

JavaPairRDD<String,MarkToMarketPNL> closedMTMPNL = closedMTM.keyBy(new Function<MarkToMarketPNL,String>(){ 
        public String call(MarkToMarketPNL mtm) throws Exception 
        { 
         return mtm.getTaxlot(); 
        } 
       }); 

有沒有其他方法可以讓我加入openMTM和closeMTM RDD?截至目前試圖獲得兩個RDD的連接可以在String上執行。什麼導致異常發生?

附加堆棧跟蹤

java.lang.NullPointerException 
15/06/28 01:19:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.lang.NullPointerException 
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53) 
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89) 
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
15/06/28 01:19:30 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException 
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53) 
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89) 
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
+0

我的第一個猜測是,一些mtms爲空。 – abalcerek

回答

0

我認爲錯誤是不是在你的問題中包含的代碼。 Spark正試圖在RDD上運行count。你所包含的代碼不叫count,所以這是一個標誌。但是例外情況表明,被計算的RDD具有在Java中創建的迭代器,現在正在轉換爲Scala迭代器。在那一點上,事實證明這個迭代器實際上是null

您的代碼是否在某處生成迭代器?或許在mapPartitions打電話或者其他的?

1

此異常是由於您的某個函數返回空值。你可以返回null,然後過濾null元組,例如:

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){ 
      public String call(MarkToMarketPNL mtm) throws Exception 
      { 
        return mtm.getTaxlot(); 
      } 
     }).filter(new Function<Tuple2<String, MarkToMarketPNL>, Boolean>() { 

     @Override 
     public Boolean call(Tuple2<String, MarkToMarketPNL> arg) throws Exception { 
      return arg == null ? false : true; 
     } 
    }); 
0

我曾遇到同樣的問題。當內部執行連接操作<鍵時,將創建可重用<值>>。如果其中一個可重用的<值>對象爲空,我們會看到類似上面的空指針異常。

在執行連接之前,請確保沒有任何值爲空。