2016-12-14 58 views
6

我有我的自定義Java對象,並希望能夠充分利用JVM的內置序列化將其發送到卡夫卡的話題,但系列化失敗,下面的錯誤發送自定義Java對象卡夫卡主題

org.apache.kafka。 common.errors.SerializationException:不能類com.spring.kafka.Payload的 值轉換爲org.apache.kafka.common.serialization.ByteArraySerializer指定 類 value.serializer

有效載荷。 java

public class Payload implements Serializable { 

    private static final long serialVersionUID = 123L; 

    private String name="vinod"; 

    private int anInt = 5; 

    private Double aDouble = new Double("5.0"); 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public int getAnInt() { 
     return anInt; 
    } 

    public void setAnInt(int anInt) { 
     this.anInt = anInt; 
    } 

    public Double getaDouble() { 
     return aDouble; 
    } 

    public void setaDouble(Double aDouble) { 
     this.aDouble = aDouble; 
    } 

} 

在我的創作製片人,我有以下的屬性中設置

<entry key="key.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 
       <entry key="value.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 

我發送調用是如下

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload())); 

什麼是發送自定義Java對象正確的方法通過製作人製作一個卡夫卡主題?

+0

另一種選擇是轉換成JSON格式發送 – ravthiru

回答

8

我們有2個選項下

1)上市。如果我們打算髮送定製的Java對象來製作,我們需要創建一個實現org.apache.kafka.common.serialization.Serializer和串行通過下面

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer { 

    public void configure(Map map, boolean b) { 

    } 

    public byte[] serialize(String s, Object o) { 

     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(o); 
      oos.close(); 
      byte[] b = baos.toByteArray(); 
      return b; 
     } catch (IOException e) { 
      return new byte[0]; 
     } 
    } 

    public void close() { 

    } 
} 

創建您的生產者

代碼參考的過程中串行器類,並設置相應的值串行

<entry key="value.serializer" 
         value="com.spring.kafka.PayloadSerializer" /> 

2)不需要創建自定義序列化類。使用現有ByteArraySerializer,但在發送遵循該過程

Java對象 - >字符串(最好是JSON represenation代替 的toString) - >字節組

3

由於您使用的是ByteArraySerializer,因此您需要實例化一個byte []生產者。

Producer<byte[],byte[]> producer = new KafkaProducer<>(props); 

,然後同時產生傳遞的byte []序列化或其它一些方法之後,例如,

producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes())); 

如果您傳遞只是一個有效載荷對象生產者那麼這將是更好的有關鍵的序列化程序和值序列化程序,無論您打算如何傳遞,在閱讀時都需要從中讀取數據。

最好使用Serializable和ByteArraySerializer/ByteArrayDeserializer。