2016-06-08 149 views
1

我開始在我的項目中使用spring-integration-kafka,我可以生成消息,但消費者無法接收消息,我不知道爲什麼。春季整合 - 卡夫卡消費者無法接收消息的問題

這是我的代碼,

的卡夫卡生產者的context.xml

<int:publish-subscribe-channel id="inputToKafka"/> 

<int-kafka:outbound-channel-adapter id="kafkaOutbundChannelAdaptor" 
            kafka-producer-context-ref="kafkaProductorContext" 
            auto-startup="true" 
            channel="inputToKafka" 
            order="1"> 
</int-kafka:outbound-channel-adapter> 

<bean id="producerProperties" 
    class="org.springframework.beans.factory.config.PropertiesFactoryBean"> 
    <property name="properties"> 
     <props> 
      <prop key="topic.metadata.refresh.interval.ms">3600000</prop> 
      <prop key="queue.buffering.max.ms">500</prop> 
      <prop key="queue.buffering.max.messages">10000</prop> 
      <prop key="retry.backoff.ms">100</prop> 
      <prop key="message.send.max.retries">2</prop> 
      <prop key="socket.request.max.bytes" >104857600</prop> 
      <prop key="send.buffer.bytes" >5242880</prop> 
      <prop key="socket.receive.buffer.bytes" >1048576</prop> 
      <prop key="socket.send.buffer.bytes" >1048576</prop> 
      <prop key="request.required.acks">1</prop> 
     </props> 
    </property> 
</bean> 
<int-kafka:producer-context id="kafkaProductorContext" 
    producer-properties="producerProperties"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration 
      broker-list="localhost:9092" 
      key-class-type="java.lang.String" 
      value-class-type="java.lang.String" 
      topic="test" 
      async="true" 
      partitioner="partitioner" 
      key-encoder="encoder" 
      value-encoder="encoder" 
      compression-codec="default"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

<bean id="partitioner" class="org.springframework.integration.kafka.support.DefaultPartitioner"/> 

<bean id="encoder" 
class="org.springframework.integration.kafka.serializer.common.StringEncoder" /> 

<!-- <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> 
    <constructor-arg value="java.lang.String" /> 
</bean> --> 

生產

public class Productor { 

@Autowired 
@Qualifier("inputToKafka") 
private MessageChannel chanel; 
private static final Logger logger = LoggerFactory 
     .getLogger(Productor.class); 

public void sendMessage(String message) { 

    try { 
     boolean flag = chanel.send(MessageBuilder.withPayload(message) 
       .setHeader("topic", "test").setHeader("messageKey", "key") 
       .build()); 
     System.out.println(flag); 
    } catch (Exception e) { 
     logger.error(String.format("Failed to send [ %s ] to topic %s ", 
       message, "test"), e); 

    } 

} 

的卡夫卡消費者的context.xml

<context:component-scan base-package="net.rminfo"/> 

<int:channel id="inputFromKafka"> 
    <int:dispatcher task-executor="kafkaMessageExecutor"/> 
</int:channel> 

<task:executor id="kafkaMessageExecutor" pool-size="5" keep-alive="120" queue-capacity="500" /> 

<!-- channel配置 auto-startup="true" 否則接收不發數據 --> 
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" 
    kafka-consumer-context-ref="consumerContext" auto-startup="true" 
    channel="inputFromKafka"> 
    <int:poller fixed-delay="10" time-unit="MILLISECONDS" 
      max-messages-per-poll="5" /> 
</int-kafka:inbound-channel-adapter> 

<!-- <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumer" method="processMessage" /> --> 
<!-- <bean id="kafkaConsumer" class="net.rminfo.demo.Customer"/> --> 
<int:service-activator input-channel="inputFromKafka" ref="kafkaConsumer" method="processMessage" /> 


<bean id="consumerProperties" 
    class="org.springframework.beans.factory.config.PropertiesFactoryBean"> 
    <property name="properties"> 
     <props> 
      <prop key="auto.offset.reset">smallest</prop> 
      <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M --> 
      <prop key="fetch.message.max.bytes">5242880</prop> 
      <prop key="auto.commit.interval.ms">1000</prop> 
     </props> 
    </property> 
</bean> 

<!-- zookeeper配置 可以配置多個 --> 
<int-kafka:zookeeper-connect id="zookeeperConnect" 
    zk-connect="localhost:2181" zk-connection-timeout="4000" 
    zk-session-timeout="4000" zk-sync-time="200"/> 

<int-kafka:consumer-context 
    id="consumerContext" consumer-timeout="1000" 
    zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> 
    <int-kafka:consumer-configurations> 
     <int-kafka:consumer-configuration group-id="mygroup" max-messages="5000" > 
      <int-kafka:topic streams="3" id="test"/> 
     </int-kafka:consumer-configuration> 
    </int-kafka:consumer-configurations> 
</int-kafka:consumer-context> 

消費者

@Component("kafkaConsumer") 
public class Customer { 

public static final Logger logger = LoggerFactory.getLogger(Customer.class); 

@ServiceActivator 
public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) { 
    logger.info("===============processMessage==============="); 
    for (Map.Entry<String, Map<Integer, List<byte[]>>> entry : msgs 
      .entrySet()) { 
     logger.info("============Topic:" + entry.getKey()); 
     LinkedHashMap<Integer, List<byte[]>> messages = (LinkedHashMap<Integer, List<byte[]>>) entry 
       .getValue(); 
     Set<Integer> keys = messages.keySet(); 
     for (Integer i : keys) 
      logger.info("======Partition:" + i); 
     Collection<List<byte[]>> values = messages.values(); 
     for (Iterator<List<byte[]>> iterator = values.iterator(); iterator 
       .hasNext();) { 
      List<byte[]> list = iterator.next(); 
      for (byte[] object : list) { 
       String message = new String(object); 
       logger.info("=====message:" + message); 
       System.out.println(message); 
      } 
     } 
    } 
} 
} 

非常感謝。

回答

0

請更改最大的消息=您的卡夫卡消費context.xml的「5000」最大的消息=「1」和看看它是否有效。