2016-09-26 146 views
0

我正在使用春季卡夫卡1.0.3消費卡夫卡消息。 2在kafka中的主題和每個主題都有1個分區。 在java代碼中,有2個@KafkaListener消費每個主題消息。 ConcurrentKafkaListenerContainerFactory的併發性設置爲1.但消息有時會延遲20多秒。春季卡夫卡消息消費延遲

@Configuration 
@EnableKafka 
public class KafkaConfig { 

@Value("${kafka.brokers}") 
private String brokers; 

@Bean 
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> 
kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setConcurrency(1); 
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); 
    return factory; 
} 


@Bean 
public ConsumerFactory<String, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "server-group"); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    return props; 
} 

@KafkaListenr代碼如下:

@KafkaListener(id = "serverInChannel",topics = CommonContants.KAFKA_TOPIC.SERVER_IN_CHANNEL) 
public void consumeInMessage(@Payload String data, 
           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, 
           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId, 
           Acknowledgment ack) { 
    logger.info("[serverInMessage]data=" + data); 
    ack.acknowledge(); 
} 

@KafkaListener(id = "webOutChannel",topics = CommonContants.KAFKA_TOPIC.WEB_OUT_CHANNEL) 
public void consumeOutMessage(@Payload String data, 
           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId, 
           Acknowledgment ack) { 
    logger.info("[webOutMessage]data=" + data); 
    ack.acknowledge(); 
} 

卡夫卡消息被彈簧卡夫卡發送和日誌是在這裏:

2016-09-26 16:16:42,777 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-26 16:16:44,101 [webOutChannel-0-kafka-listener-6] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-26 16:16:45,551 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0] 
2016-09-26 16:16:45,562 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-26 16:16:45,663 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0] 
2016-09-26 16:16:45,715 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[] 
2016-09-26 16:16:45,781 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0] 
2016-09-26 16:16:45,805 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-26 16:16:45,870 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-26 16:17:01,099 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
2016-09-26 16:17:02,054 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0] 
2016-09-26 16:17:02,108 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0] 
2016-09-26 16:17:02,120 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0] 
2016-09-26 16:17:02,133 [serverInChannel-0-kafka-listener-5] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 

沒有在日誌中一些partitions assigned:[],我不知道爲什麼分配是空的。當webOutMessage消息在2016-09-26 16:16:45,870處消耗後,下一個動作是在2016-09-26 16:17:01,099的15+秒之後的消費者重新平衡,因此下一個消息在15+秒內延遲。

有人知道爲什麼嗎?

添加調試日誌,並延遲不是每次,有時是確定:

2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=179, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 179, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}} 
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}} 
2016-09-27 15:33:58,623 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker 
2016-09-27 15:33:58,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker 
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped 
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0] 
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped 
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0] 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {} 
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {} 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[] 
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[] 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:14,599 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
2016-09-27 15:34:14,599 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
2016-09-27 15:34:15,276 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=174, metadata=''}} 
2016-09-27 15:34:15,286 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}} 
2016-09-27 15:34:15,516 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0] 
2016-09-27 15:34:15,516 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0] 
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 3 records 
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=174, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 174, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=175, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 175, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=176, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 176, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc 
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}} 
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}} 
2016-09-27 15:34:15,795 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Woken up during commit 
2016-09-27 15:34:15,796 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 2 records 
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=180, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 180, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=181, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 181, key = 1234567890-1474961636372, value = 123abc)}]] 
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc 
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}} 
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}} 
+0

爲o.s.kafka和o.a.k.clients啓用DEBUG日誌記錄以查看它是否提供更多信息。 –

+0

在問題結尾添加一些調試日誌。 – tkec

回答

0

容器被正確反應,由經紀人發起的一個分區重新平衡。

您必須檢查服務器日誌以確定重新平衡的原因。