2015-10-15 155 views
0

我想序列化一個Java類,其對象將在Java Spark RDD中收集。 POJO包含一些數據結構,如HashMapArrayListHashMap的火花序列化問題

當我試圖迭代我的​​驅動程序中的RDD對象並嘗試獲取hashmap元素時,它拋出了Serialization exception

產生java.io.IOException:com.esotericsoftware.kryo.KryoException:顯示java.lang.NullPointerException

示例程序:

驅動程序類:

主要Implementaion:

public class MyMainTest { 
    public static ArrayList<MyInterface> call() throws Exception{ 
     ArrayList<MyInterface> li = new ArrayList<MyInterface>(); 
     for(int i=0;i<3;i++){ 
      MyRecord myrec = new MyRecord(); 
      li.add(myrec.addToTagMap()); 
     } 
     return li; 
    } 
} 

Main Implementation: 
public class MyMainTest { 
    public static ArrayList<MyInterface> call() throws Exception{ 
     ArrayList<MyInterface> li = new ArrayList<MyInterface>(); 
     for(int i=0;i<3;i++){ 
      MyRecord myrec = new MyRecord(); 
      li.add(myrec.addToTagMap()); 
     } 
     return li; 
    } 
} 

My Pojo Class: 
public class MyRecord implements MyInterface,Serializable{ 


    protected HashMap<String, Object> allTagNameValueMap_ 
    = new HashMap<String, Object>(){ 
     { 
      put("key1", "T1"); 
      put("key2", "Val2"); 
      put("key3", 3); 
     }}; 
     protected MyRecord addToTagMap() throws Exception { 
      MyRecord myRec = new MyRecord(); 
      myRec.allTagNameValueMap_.put("key1", "New Value"); 
      myRec.allTagNameValueMap_.put("key2", "New Value2"); 
      myRec.allTagNameValueMap_.put("key4", 22); 

      return myRec; 
     } 
     @Override 
     public Object getKey1() { 
      // TODO Auto-generated method stub 
      return allTagNameValueMap_.get("key1"); 
     } 
     @Override 
     public Object getKey2() { 
      // TODO Auto-generated method stub 
      return allTagNameValueMap_.get("key2"); 
     } 
     @Override 
     public Object getKey3() { 
      // TODO Auto-generated method stub 
      return allTagNameValueMap_.get("key3"); 
     } 
} 

My Interface: 
public interface MyInterface extends Serializable{ 

    public Object getKey1(); 
    public Object getKey2(); 
    public Object getKey3(); 
} 

序列化軌跡:

allTagNam eValueMap_(com.org.util.ex.ExRecord)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140) 
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) 
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

引起:com.esotericsoftware.kryo.KryoException:顯示java.lang.NullPointerException 序列化跡:

allTagNameValueMap_(com.org.util .ex.ExRecord)

at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) 
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) 
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) 
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) 
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36) 
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) 
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) 
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80) 
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80) 
at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:130) 
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80) 
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) 
... 20 more 

產生的原因:在java.util.HashMap.put(HashMap.java:493顯示java.lang.NullPointerException ) 在com.esotericsoftware.kryo.serializers.MapSerializer.read( MapSerializer.java:135 ) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.read(FieldSerializer.java:605)

在序列化HashMapSpark中有什麼特別的地方需要注意嗎?

+0

請問您可以編輯日誌使其更容易閱讀 –

+2

歡迎來到SO Hari。您可以提供[MCVE](http://stackoverflow.com/help/mcve)嗎?這對幫助你更容易,對更多的讀者更有用。在此期間檢查[this](http://stackoverflow.com/a/33042316/1560062)。 – zero323

+0

謝謝。 zero323 我應該對MyKryoRegistrator一個電話從我的驅動程序的,然後設置sparkConf序列化作爲KryoSerializable – Harisyam

回答

0

我修正了這個問題。

我遵循的步驟。

  1. 創建一個Kryo註冊器類並將Pojo註冊爲Kryo Serializable。
public class MyKryoRegistrator implements KryoRegistrator, Serializable { 
    @Override 
    public void registerClasses(Kryo kryo) { 
     // Product POJO associated to a product Row from the DataFrame  
     kryo.register(MyRecord.class); 
    } 
} 

2.將序列化類型,並在裏面我的主要火花conf下KRYO registrator類。

sConf.set(「spark.serializer」,「org.apache.spark.serializer.KryoSerializer」);
sConf.set(「spark.kryo.registrator」,「com.spark.util.umf.MyKryoRegistrator」);

3.使用POJO類中的構造函數初始化HashMap。

受保護的地圖allTagNameValueMap_ = new HashMap(); public MyRecord(){ allTagNameValueMap_.put(「key1」,「V1」); allTagNameValueMap_.put(「key2」,「V2」); allTagNameValueMap_.put(「key3」,「V3」); };