根據文檔上kafka javadocs如果我:卡夫卡模式訂閱。再平衡沒有被觸發新主題
- 訂閱模式
- 創建匹配模式
應該會出現一個重新平衡的話題,這使消費者從這個新話題中讀到。但這並沒有發生。
如果我停止並啓動消費者,它確實會提出新的話題。所以我知道新的主題與模式匹配。 https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics這個問題可能有重複,但是這個問題沒有任何意義。
我看到卡夫卡日誌並沒有錯誤,它只是不會觸發重新平衡。當消費者加入或死亡時,會觸發重新平衡,但在創建新主題時(即使將分區添加到現有主題,但這不是另一個主題時)也不會。
我使用kafka 0.10.0.0和官方Java客戶端爲「新消費者API」,意思是經紀人GroupCoordinator而不是胖客戶端+動物園管理員。
這是消費者樣本代碼:
public class SampleConsumer {
public static void main(String[] args) throws IOException {
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.setProperty("group.id", "my-group");
System.out.println(properties.get("group.id"));
consumer = new KafkaConsumer<>(properties);
}
Pattern pattern = Pattern.compile("mytopic.+");
consumer.subscribe(pattern, new SampleRebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s %s\n", record.topic(), record.value());
}
}
}
}
在製片人,我將消息發送到名爲mytopic1,mytopic2主題等
模式是相當如果重新平衡沒有被觸發,那就沒用了。
你知道爲什麼再平衡沒有發生嗎?
你等了多久纔看到新話題是否開始消費?組協調員代理異步檢查以查看新主題是否符合模式,並定期進行。你可能沒有等到足夠長的時間。 – alexlod
@alexlod您的評論是在正確的方向。我發現哪個屬性控制檢查的時間段並將其添加到答案中。 – user1118312