2016-05-06 155 views
0

我有以下卡夫卡消費者,我想在特定情況下暫停,然後稍後恢復使用以前的所有消息。一個想法是使用共享標誌,可以由其他線程更新並在使用之前,即iterator.next().message()我檢查標誌的值。如果真的不消耗其他消費的消息。只是想檢查我是否在正確的方向思考,或者是否有更好的方法。暫停高級卡夫卡消費者

class KafkaConsumer implements Runnable { 
     KafkaStream<byte[], byte[]> topicStream; 
     ConsumerConnector consumerConnectorObj; 

     public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream, 
       final ConsumerConnector consumerConnectorObj) { 
      this.topicStream = topicStream; 
      this.consumerConnectorObj = consumerConnectorObj; 
     } 

     @Override 
     public void run() { 
      if (topicStream != null) { 
       ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator(); 
       while (true) { 
         if (iterator != null) { 
          boolean nextFlag = true; 
          try { 
           iterator.hasNext(); 
          } catch (ConsumerTimeoutException e) { 
           LOG.warn("Consumer timeout occured", e); 
           nextFlag = false; 
          } 

          if (nextFlag) { 
           byte[] msg = iterator.next().message(); 
          } 
         } 
       } 
      } 
     } 
    } 

回答

0

通常情況下,您不應該需要通過手工方式在線程之間進行同步,因爲您很有可能會誤解它。幫助者請看java.util.concurrent。在你的情況下,它可能是一個信號量。使用它的最簡單的方法是在處理消息之前獲取信號量,然後將其返回,並在下一個循環中立即嘗試再次獲取信號量。

我的預感是,這不是最好的辦法。我寧願撥打availablePermits()並繼續消費,而數字更大。只有當它下降到零時,才嘗試獲取信號量。這將阻止該線程,直到另一個線程再次提供一個許可。這將打開你的工作線程,並把它交給許可證,它應該馬上讓回來,開始像上面那樣循環。

while (true) { 
    if (semaphore.availablePermits()<=0) { 
    semaphore.acquire(); // will block 
    // eventually another thread increments the semaphore again 
    // and we arrive here 
    semaphore.release(); 
    } 
    // keep processing messages 
} 

你可能會在this question的答案中找到補充意見。

+0

謝謝哈拉德我會嘗試它,並會回來 –