2017-04-25 66 views
0

如下所示,我的代碼是一個高層次的消費者在kafka服務器中獲取32個分區的主題,我很困惑,爲什麼有時我會從consumer.poll()中獲得一個空的返回值。 我試圖增加輪詢超時,然後當我增加超時到1000,然後每個輪詢都有返回數據,而我設置超時到10或0,然後我看到很多空回報。當卡夫卡消費者調查返回空記錄?

任何人都可以告訴我如何設置正確的超時?

def main(args: Array[String]): Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", "kafka-01:9098") 
    props.put("group.id", "kch1") 
    props.put("enable.auto.commit", "true") 
    props.put("auto.commit.interval.ms", "1000") 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 

    //props.put("max.poll.records", "1000") 
    val consumers = new Array[KafkaConsumer[String, String]](16) 
    for(i <- 0 to 15) { 
     consumers(i) = new KafkaConsumer[String, String](props) 
     consumers(i).subscribe(util.Arrays.asList("veh321")) 
    } 
    var cnt = 0 
    var cacheIterator: Iterator[ConsumerRecord[String, String]] = null 
    for(i <- 0 to 15) { 
     new Thread(new Runnable { 
     override def run(): Unit = { 
      var finish = false 
      while(!finish) { 
      val start = System.currentTimeMillis() 
      cacheIterator = consumers(i).poll(100).iterator() 
      val end = System.currentTimeMillis() - start 
      if (end > 10) { 
       println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}") 
      } 
      } 
     } 
     }).start() 
    } 

回答

0

Java的消費者採用的Linux的epoll的是通過調用java.nio.channels.Selector.select(超時)底層實現方案。如果僅在100 ms內嘗試在短時間間隔內準備好多少個SelectionKeys,則很可能不會返回任何內容。

此外,在同樣的100毫秒內,消費者會做其他工作,包括輪詢協調員狀態,所以記錄輪詢的實時間隔明顯小於100毫秒,這使得難以檢索到一些真正有用的東西。

+0

因此,如果我將輪詢時間設置爲1000毫秒,我可以看到所有的輪詢都有數據返回,其中一些花費大約200毫秒,而其他花費幾毫秒,那麼當輪詢花費需要幾百秒時會發生什麼?有沒有任何參數來扭轉這種情況? –

+0

許多底層操作需要在檢索記錄之前完成,例如獲取元數據,管理連接和組等。如果您正在使用Java消費者,則可以調整'max.poll.records'來控制單輪輪詢的消息數。 – amethystic

+0

感謝您的幫助,所以如果我想要一個很大的吞吐量,我可以設置一個大poll.records和一個大輪詢超時,以減少民意調查的時間,我是對嗎? –