2017-05-27 95 views
1

我有一個包含java.time.LocalDateTime一個基本的POJO:春天開機卡夫卡LocalDateTime

package foo.bar.asire.api.model; 

import java.time.LocalDateTime; 

public class Address 
{ 
    private Long id; 
    private Integer houseNumber; 
    private String address; 
    private LocalDateTime created; 

    public Address() 
    { 
     super(); 
    } 

    public Address(Long id, Integer houseNumber, String address, LocalDateTime created) 
    { 
     super(); 
     this.id = id; 
     this.houseNumber = houseNumber; 
     this.address = address; 
     this.created = created; 
    } 

    public Long getId() 
    { 
     return id; 
    } 

    public void setId(Long id) 
    { 
     this.id = id; 
    } 

    public Integer getHouseNumber() 
    { 
     return houseNumber; 
    } 

    public void setHouseNumber(Integer houseNumber) 
    { 
     this.houseNumber = houseNumber; 
    } 

    public String getAddress() 
    { 
     return address; 
    } 

    public void setAddress(String address) 
    { 
     this.address = address; 
    } 

    public LocalDateTime getCreated() 
    { 
     return created; 
    } 

    public void setCreated(LocalDateTime created) 
    { 
     this.created = created; 
    } 

    @Override 
    public String toString() 
    { 
     return "Address [id=" + id + ", houseNumber=" + houseNumber 
       + ", address=" + address + ", created=" + created + "]"; 
    } 

// @Override 
// public String toString() 
// { 
//  return "Address [id=" + id + ", houseNumber=" + houseNumber 
//    + ", address=" + address + "]"; 
// } 


} 

如果我刪除了LocalDateTime對象,我能夠發送使用下面的代碼/接收,並創建Address對象從我的消費:

package foo.bar.consumer.config; 

import java.util.HashMap; 
import java.util.Map; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.kafka.annotation.EnableKafka; 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
import org.springframework.kafka.core.ConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 
import org.springframework.kafka.support.serializer.JsonDeserializer; 

import foo.bar.asire.api.model.Address; 

@Configuration 
@EnableKafka 
public class KafkaConsumerConfig 
{ 
    @Value("${kafka.consumer.bootstrap}") 
    private String bootstrapServers; 

    @Value("${kafka.consumer.group}") 
    private String group; 

    @Bean 
    public Map<String, Object> consumerConfigs() 
    { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, group); 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, Address> consumerFactory() 
    { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs(), 
       new StringDeserializer(), 
       new JsonDeserializer<>(Address.class)); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, Address> kafkaListenerContainerFactory() 
    { 
     ConcurrentKafkaListenerContainerFactory<String, Address> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 

     return factory; 
    } 
} 

這裏是我的AddressConsumer @Service:

package foo.bar.consumer.service; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.stereotype.Service; 

import foo.bar.asire.api.model.Address; 

@Service 
public class AddressConsumer 
{ 
    private final Logger log = LoggerFactory.getLogger(this.getClass()); 

    @KafkaListener(topics = "${kafka.consumer.topic}") 
    private void consumeAddress(Address address) 
    { 
     log.info("received address='{}'", address.toString()); 
    } 
} 

但是,WH恩我用的是原來的地址POJO與LocalDateTime對象,我得到以下異常:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition asire-0 at offset 29 
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 49, 44, 34, 104, 111, 117, 115, 101, 78, 117, 109, 98, 101, 114, 34, 58, 49, 50, 51, 52, 44, 34, 97, 100, 100, 114, 101, 115, 115, 34, 58, 34, 70, 111, 111, 98, 97, 114, 32, 76, 97, 110, 101, 34, 44, 34, 99, 114, 101, 97, 116, 101, 100, 34, 58, 123, 34, 100, 97, 121, 79, 102, 77, 111, 110, 116, 104, 34, 58, 50, 55, 44, 34, 100, 97, 121, 79, 102, 87, 101, 101, 107, 34, 58, 34, 83, 65, 84, 85, 82, 68, 65, 89, 34, 44, 34, 100, 97, 121, 79, 102, 89, 101, 97, 114, 34, 58, 49, 52, 55, 44, 34, 109, 111, 110, 116, 104, 34, 58, 34, 77, 65, 89, 34, 44, 34, 109, 111, 110, 116, 104, 86, 97, 108, 117, 101, 34, 58, 53, 44, 34, 121, 101, 97, 114, 34, 58, 50, 48, 49, 55, 44, 34, 104, 111, 117, 114, 34, 58, 49, 52, 44, 34, 109, 105, 110, 117, 116, 101, 34, 58, 53, 52, 44, 34, 110, 97, 110, 111, 34, 58, 49, 54, 53, 48, 48, 48, 48, 48, 48, 44, 34, 115, 101, 99, 111, 110, 100, 34, 58, 57, 44, 34, 99, 104, 114, 111, 110, 111, 108, 111, 103, 121, 34, 58, 123, 34, 105, 100, 34, 58, 34, 73, 83, 79, 34, 44, 34, 99, 97, 108, 101, 110, 100, 97, 114, 84, 121, 112, 101, 34, 58, 34, 105, 115, 111, 56, 54, 48, 49, 34, 125, 125, 125]] from topic [asire] 
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDateTime: no suitable constructor found, can not deserialize from Object value (missing default constructor or creator, or perhaps need to add/enable type information?) 
at [Source: [[email protected]; line: 1, column: 63] (through reference chain: foo.bar.asire.api.model.Address["created"]) 
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1456) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1012) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1206) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:314) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:504) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:104) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:357) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1626) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1237) ~[jackson-databind-2.8.8.jar:2.8.8] 
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:86) ~[spring-kafka-1.2.1.RELEASE.jar:na] 
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:869) ~[kafka-clients-0.10.2.0.jar:na] 
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na] 
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) ~[kafka-clients-0.10.2.0.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) ~[spring-kafka-1.2.1.RELEASE.jar:na] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131] 
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] 

這裏是我的Maven pom.xml文件,基本上是相同的兩個生產者/消費者應用程序:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>foo.bar</groupId> 
    <artifactId>consumer</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <packaging>jar</packaging> 

    <name>consumer</name> 
    <description>Demo project for Spring Boot</description> 

    <parent> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-parent</artifactId> 
     <version>1.5.3.RELEASE</version> 
     <relativePath/> <!-- lookup parent from repository --> 
    </parent> 

    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
     <java.version>1.8</java.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-web</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.kafka</groupId> 
      <artifactId>spring-kafka</artifactId> 
      <version>1.2.1.RELEASE</version> 
     </dependency> 

     <dependency> 
      <groupId>foo.bar</groupId> 
      <artifactId>asire-api</artifactId> 
      <version>0.0.1-SNAPSHOT</version> 
     </dependency> 

     <dependency> 
      <groupId>com.google.guava</groupId> 
      <artifactId>guava</artifactId> 
      <version>22.0</version> 
     </dependency> 

     <!-- Jackson Dependencies --> 
     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-core</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-databind</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>com.fasterxml.jackson.datatype</groupId> 
      <artifactId>jackson-datatype-jsr310</artifactId> 
     </dependency> 


     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.springframework.kafka</groupId> 
      <artifactId>spring-kafka-test</artifactId> 
      <version>1.2.1.RELEASE</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.hamcrest</groupId> 
      <artifactId>hamcrest-core</artifactId> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.mockito</groupId> 
      <artifactId>mockito-core</artifactId> 
      <version>2.7.22</version> 
      <scope>test</scope> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.springframework.boot</groupId> 
       <artifactId>spring-boot-maven-plugin</artifactId> 
      </plugin> 
     </plugins> 
    </build> 


</project> 

這裏是我的application.properties文件爲我的消費者,但我的製片人是差不多的:j的

server.port=8081 

spring.jackson.serialization.write_dates_as_timestamps=false 

kafka.consumer.bootstrap=localhost:9092 
kafka.consumer.topic=asire 
kafka.consumer.group=AsireGroup 
+0

這是我的application.properties文件:server.port=8081 spring.jackson.serialization.write_dates_as_timestamps=false kafka.consumer.bootstrap=localhost:9092 kafka.consumer.topic=asire kafka.consumer.group=AsireGroup user3175414

回答

0

對還是錯,我能夠通過修改原始地址POJO與得到這個做正確的JsonSerialization/JsonDeserialization以下更改。增加了以下進口:

import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; 
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; 

添加了以下注釋:

@JsonSerialize(using = LocalDateTimeSerializer.class) 
@JsonDeserialize(using = LocalDateTimeDeserializer.class) 
private LocalDateTime created; 
1

不能構造實例ava.time.LocalDateTime

Jackson(json引擎)無法反序列化沒有參數構造函數的對象。

+0

它有一個參數的構造函數,我剛剛離開它的帖子..我編輯了原創。 – user3175414

+0

但'LocalDateTime'不是;它只有一個私人ctor。 –

+0

有沒有辦法讓這個工作,或者我應該只是修改我的POJO並使用java.util.Date呢? – user3175414