2016-11-14 107 views
1
處理卡夫卡的錯誤

我從卡夫卡主題閱讀郵件在斯卡拉通過以下方式:如何從斯卡拉

import org.apache.spark.streaming.kafka.KafkaUtils 

val topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap 

val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2) 

我不知道什麼是處理可能的連接失敗的正確方法,尤其是考慮到我的Spark Streaming作業將在很長時間內運行,並且在此期間肯定會出現一些連接問題。 我希望Streaming作業在連接問題時不會停止,但它應該嘗試自動重新連接並讀取連接失敗之前未處理的所有消息

我假設我應該正確地設置auto.offset.reset, auto.commit.interval.ms等,但是對於正確設置的詳細指導將非常感謝。

回答

1

當你使用Spark爲Kafka提供的抽象時,會爲你處理錯誤,這樣你就不必擔心它們,除非你的Kafka集羣失敗了,並且你不能再處理消息,在這種情況下會使流應用程序最終終止

例如,you can read the part of the code實際處理卡夫卡的消息(這僅僅是接收器爲基礎的方法有關):

/** Class to handle received Kafka message. */ 
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { 
    override def run(): Unit = { 
     while (!isStopped) { 
     try { 
      val streamIterator = stream.iterator() 
      while (streamIterator.hasNext) { 
      storeMessageAndMetadata(streamIterator.next) 
      } 
     } catch { 
      case e: Exception => 
      reportError("Error handling message", e) 
     } 
     } 
    } 
} 
+0

是的,但根據我的測試,當動物園管理員或卡夫卡是關閉的,那麼我的代碼停止。不過,我希望它等待並再次檢查kafka隊列,比方說,10分鐘等等。然後我也不想放鬆發送的信息。我怎樣才能控制它與抵消?我應該將它設置爲「最早」還是應該手動控制它? – duckertito

+0

@duckertito這不是你可以配置的東西。 –

+0

我認爲有可能捕獲這種情況。但實際上這很奇怪。如果使用Spark Streaming + Kafka,則表示存在打開的連接。如果Spark Streaming作業長時間運行(例如X個月直到某些維護),那麼肯定會出現連接失敗的情況。那麼,在這種情況下通常會做什麼呢?如何防止這種情況?我應該每天檢查一次Kafka還是如何自動化它? – duckertito