2016-11-17 74 views
0

我正在玩KafkaStreamsKafkaConnect只是試圖消費來自主題的消息。我有一個爲這個話題設置的「標準」批量消費者,它的功能就像一個魅力。我首先向卡夫卡發送一些記錄,然後消耗它們。現在我想用Kakfa流做同樣的事情,但是我沒有從主題中得到一條消息。這是我正在使用的消費者代碼。KafkaStream沒有收到來自主題的任何消息

final int NUMBER_OF_PARTITIONS = 4; 
final Properties consumerConfig = new Properties(); 
consumerConfig.setProperty("zookeeper.connect", RULE.getConfiguration().kafka.getZookeeperUrl()); 
consumerConfig.setProperty("backoff.increment.ms", "100"); 
consumerConfig.setProperty("group.id", "java-consumer-example"); 
consumerConfig.setProperty("consumer.timeout.ms", "1000000"); 
consumerConfig.setProperty("client.id", "someclient"); 
consumerConfig.setProperty("auto.offset.reset", "smallest"); 
consumerConfig.setProperty("enable.auto.commit", "false"); 
consumerConfig.setProperty("bootstrap.servers", RULE.getConfiguration().kafka.getHosts()); 

final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); 
final TopicFilter sourceTopicFilter = new Whitelist(RULE.getConfiguration().kafka.getTopic()); 

final VerifiableProperties decoderProps = new VerifiableProperties(); 
decoderProps.props().setProperty("schema.registry.url", RULE.getConfiguration().kafka.getRegistry()); 
decoderProps.props().setProperty("max.schemas.per.subject", "1"); 
final List<KafkaStream<String, Object>> streams = connector 
    .createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS, new StringDecoder(decoderProps), new KafkaAvroDecoder(decoderProps)); 

final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARTITIONS); 
for (final KafkaStream stream : streams) { 
    executorService.submit(() -> { 
     try { 
      final ConsumerIterator it = stream.iterator(); 
      while (it.hasNext()) { 
       final MessageAndMetadata messageAndMetadata = it.next(); 
       final String key = (String) messageAndMetadata.key(); 
       System.out.println("KEY" + key); 
      } 
     } catch (final Exception ex) { 
      LOGGER.error("ERROR", ex); 
     } 
    }); 
} 

我的問題是,我的代碼繼續等待it.hasNext()條件,直到達到超時。我可能在這裏錯過了一些細節,但無法弄清楚,爲什麼我沒有從主題中得到任何東西。作爲此測試的一部分,我有一位製作人員,在消費者啓動之前將許多記錄發送到此主題中,因此它不能成爲抵消問題。任何想法都會受到高度的歡迎。

回答

0

我找到了解決方案。該錯誤超出了我發佈的代碼。我提供的用於關閉ExecutorService的超時時間太短,所以它只是在沒有提供足夠時間執行消費者作業的情況下殺死它。

相關問題