2016-08-20 131 views
2

我有多個生產者可以將多種類型的事件發送給一個卡夫卡主題。春季卡夫卡在一個消費者中使用多種消息類型

我有一個消費者,必須消費所有類型的消息。對於每種類型的消息都有不同的邏輯。

@KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 
public void handleEvent(Message<EventOne> event) { 
    logger.info("event={}", event); 
} 

但是,在這種情況下,所有消息都對這種方法不僅EventOne

如果我實現兩個方法(每個類型的消息),那麼所有消息來只有一個方法。

如果我實現監聽器是這樣的:

@KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 
public void handleEvent(Message<?> event) { 
    logger.info("event={}", event); 
} 

然後我得到異常: org.springframework.kafka.KafkaListenerEndpointContainer#0-0-卡夫卡偵聽-1] ERROR org.springframework.kafka.listener .LoggingErrorHandler - 處理時出錯:ConsumerRecord java.lang.IllegalArgumentException:無法識別類型:[null]

請告訴我如何實現多種消費類型?

回答

2

我發現了靈魂,它非常簡單。 所以,它不是來自這個問題不清楚,但是我用jsonMessageConverter 這樣的:

@Bean 
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setMessageConverter(new StringJsonMessageConverter()); 
    return factory; 
} 

傑克遜庫默認不增加了對JSON類信息,因此無法猜測想要什麼類型的我的反序列化。 解決方案是註釋

@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class") 

,增加了classinformation到JSON字符串。 在消費者方面我只寫我的事件的基類

@KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 
public void handleEvent(BasicEvent event) { 

    if (event instanceof EventOne) { 
     logger.info("type={EventOne}, event={}", event); 
    } else { 
     logger.info("type={EventTwo}, event={}", event); 
    } 
} 

希望這些信息可以幫助別人。