0
我有以下卡夫卡消費者,我想在特定情況下暫停,然後稍後恢復使用以前的所有消息。一個想法是使用共享標誌,可以由其他線程更新並在使用之前,即iterator.next().message()
我檢查標誌的值。如果真的不消耗其他消費的消息。只是想檢查我是否在正確的方向思考,或者是否有更好的方法。暫停高級卡夫卡消費者
class KafkaConsumer implements Runnable {
KafkaStream<byte[], byte[]> topicStream;
ConsumerConnector consumerConnectorObj;
public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream,
final ConsumerConnector consumerConnectorObj) {
this.topicStream = topicStream;
this.consumerConnectorObj = consumerConnectorObj;
}
@Override
public void run() {
if (topicStream != null) {
ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator();
while (true) {
if (iterator != null) {
boolean nextFlag = true;
try {
iterator.hasNext();
} catch (ConsumerTimeoutException e) {
LOG.warn("Consumer timeout occured", e);
nextFlag = false;
}
if (nextFlag) {
byte[] msg = iterator.next().message();
}
}
}
}
}
}
謝謝哈拉德我會嘗試它,並會回來 –