0
如下所示,我的代碼是一個高層次的消費者在kafka服務器中獲取32個分區的主題,我很困惑,爲什麼有時我會從consumer.poll()中獲得一個空的返回值。 我試圖增加輪詢超時,然後當我增加超時到1000,然後每個輪詢都有返回數據,而我設置超時到10或0,然後我看到很多空回報。當卡夫卡消費者調查返回空記錄?
任何人都可以告訴我如何設置正確的超時?
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka-01:9098")
props.put("group.id", "kch1")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//props.put("max.poll.records", "1000")
val consumers = new Array[KafkaConsumer[String, String]](16)
for(i <- 0 to 15) {
consumers(i) = new KafkaConsumer[String, String](props)
consumers(i).subscribe(util.Arrays.asList("veh321"))
}
var cnt = 0
var cacheIterator: Iterator[ConsumerRecord[String, String]] = null
for(i <- 0 to 15) {
new Thread(new Runnable {
override def run(): Unit = {
var finish = false
while(!finish) {
val start = System.currentTimeMillis()
cacheIterator = consumers(i).poll(100).iterator()
val end = System.currentTimeMillis() - start
if (end > 10) {
println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}")
}
}
}
}).start()
}
因此,如果我將輪詢時間設置爲1000毫秒,我可以看到所有的輪詢都有數據返回,其中一些花費大約200毫秒,而其他花費幾毫秒,那麼當輪詢花費需要幾百秒時會發生什麼?有沒有任何參數來扭轉這種情況? –
許多底層操作需要在檢索記錄之前完成,例如獲取元數據,管理連接和組等。如果您正在使用Java消費者,則可以調整'max.poll.records'來控制單輪輪詢的消息數。 – amethystic
感謝您的幫助,所以如果我想要一個很大的吞吐量,我可以設置一個大poll.records和一個大輪詢超時,以減少民意調查的時間,我是對嗎? –