2017-08-16 78 views
0

我的卡夫卡消費者代碼如下所示,我只有一個消費者!提交無法完成,由於rebalance

Properties consumerConfig = new Properties(); 
     consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "sandbox.hortonworks.com:6667"); 
     consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); 
     consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     @SuppressWarnings("resource") 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(consumerConfig); 
     TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(); 
     consumer.subscribe(Collections.singletonList("test1"), rebalanceListener); 
     HDFSAppendTrial example = new HDFSAppendTrial(); 
     String coreSite = "/usr/hdp/2.6.0.3-8/hadoop/etc/hadoop/core-site.xml"; 
     String hdfsSite = "/usr/hdp/2.6.0.3-8/hadoop/etc/hadoop/hdfs-site.xml"; 
     String hdfsFilePath = "/appendTo/Trial.csv"; 

     while (true) { 
      ConsumerRecords<String, String> records = consumer.poll(999999999); 
      for (ConsumerRecord<String, String> record : records) { 
       FileSystem fileSystem = example.configureFileSystem(coreSite, hdfsSite); 
       String res = example.appendToFile(fileSystem, record.value(), hdfsFilePath); 
       System.out.printf("%s\n", record.value()); 
       if (res.equalsIgnoreCase("success")) { 
        System.out.println("Successfully appended to file"); 
       } 
       else 
        System.out.println("couldn't append to file"); 
       example.closeFileSystem(fileSystem); 

      } 
      consumer.commitSync(); 


     } 

正在逐漸流的記錄

enter image description here

如何解決上述問題,並給我解釋一下導致該問題的一些號碼後,下面的錯誤,如何克服這樣的!有人可以幫助我編寫一些消費者的代碼,並提供更多的要求。

預先感謝一個誰可以幫助我,也給了一個誰給至少嘗試

回答

0

consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"True"); 
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"99998"); 
consumerConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000"); 
consumerConfig.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"99999"); 

我能解決這個問題通過將上述特性在屬性對象中,consumerConfig