2016-08-03 47 views
4

根據文檔上kafka javadocs如果我:卡夫卡模式訂閱。再平衡沒有被觸發新主題

  • 訂閱模式
  • 創建匹配模式

應該會出現一個重新平衡的話題,這使消費者從這個新話題中讀到。但這並沒有發生。

如果我停止並啓動消費者,它確實會提出新的話題。所以我知道新的主題與模式匹配。 https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics這個問題可能有重複,但是這個問題沒有任何意義。

我看到卡夫卡日誌並沒有錯誤,它只是不會觸發重新平衡。當消費者加入或死亡時,會觸發重新平衡,但在創建新主題時(即使將分區添加到現有主題,但這不是另一個主題時)也不會。

我使用kafka 0.10.0.0和官方Java客戶端爲「新消費者API」,意思是經紀人GroupCoordinator而不是胖客戶端+動物園管理員。

這是消費者樣本代碼:

public class SampleConsumer { 
public static void main(String[] args) throws IOException { 
    KafkaConsumer<String, String> consumer; 
    try (InputStream props = Resources.getResource("consumer.props").openStream()) { 
     Properties properties = new Properties(); 
     properties.load(props); 
     properties.setProperty("group.id", "my-group"); 

     System.out.println(properties.get("group.id")); 
     consumer = new KafkaConsumer<>(properties); 
    } 
    Pattern pattern = Pattern.compile("mytopic.+"); 
    consumer.subscribe(pattern, new SampleRebalanceListener()); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(1000); 
     for (ConsumerRecord<String, String> record : records) { 
      System.out.printf("%s %s\n", record.topic(), record.value()); 
     } 
    } 
} 

}

在製片人,我將消息發送到名爲mytopic1,mytopic2主題等

模式是相當如果重新平衡沒有被觸發,那就沒用了。

你知道爲什麼再平衡沒有發生嗎?

+0

你等了多久纔看到新話題是否開始消費?組協調員代理異步檢查以查看新主題是否符合模式,並定期進行。你可能沒有等到足夠長的時間。 – alexlod

+0

@alexlod您的評論是在正確的方向。我發現哪個屬性控制檢查的時間段並將其添加到答案中。 – user1118312

回答

3

該文檔提到「模式匹配將定期對檢查時存在的主題進行。」。事實證明,「定期」對應於metadata.max.age.ms屬性。通過將該屬性(在我的代碼示例中爲「consumer.props」)設置爲5000,我可以看到它每隔5秒檢測一次新的主題和分區。

這是有意設計的,根據這個JIRA票https://issues.apache.org/jira/browse/KAFKA-3854

在JIRA最後的註釋,指出在創建看來,消費者的訂閱模式相匹配的更高版本創建的話題不會被分配到消費者按照設計。重複訂閱()到相同的模式將需要處理這種情況。

刷新元數據輪詢執行票證中提到的「repeat subscribe()」。

這是令人困惑的來自卡夫卡0.8,其中真正觸發基於zookeper手錶,而不是投票。對於這種情況,IMO 0.9更多的是降級,而不是「及時」重新平衡,這會變成帶有開銷的高頻率輪詢,或者在對新的主題/分區作出反應之前的很長時間的低頻率輪詢。