2017-10-06 68 views
1

我在Spark 2.1.1上運行流式作業,輪詢Kafka 0.10。我正在使用Spark KafkaUtils類創建一個DStream,並且所有內容都正常工作,直到由於保留策略導致數據超出主題。如果任何數據超出了主題,我會停止工作做出一些更改,但我得到的錯誤表明我的偏移量超出範圍。我做了很多研究,包括查看火花源代碼,並且我看到很多評論,如本期的評論:SPARK-19680 - 基本上說數據不應該丟失 - 所以auto.offset.reset被spark忽略。但是,我的大問題是我現在可以做什麼?我的主題不會在spark中輪詢 - 它會在啓動時因偏移量異常而死亡。我不知道如何重置偏移量,這樣我的工作纔會重新開始。我沒有啓用檢查點,因爲我讀到這些使用不可靠。我曾經有很多的代碼來管理偏移,但現在看來,火花忽略請求補償,如果有任何承諾,所以我目前所管理的偏移是這樣的:來自Kafka主題的Spark Streaming拋出偏移超出範圍,無法重新啓動流

val stream = KafkaUtils.createDirectStream[String, T](
    ssc, 
    PreferConsistent, 
    Subscribe[String, T](topics, kafkaParams)) 

stream.foreachRDD { (rdd, batchTime) => 
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    Log.debug("processing new batch...") 

    val values = rdd.map(x => x.value()) 
    val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist 

    consumer.processDataset(incomingFrame, batchTime) 
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) 
} 
ssc.start() 
ssc.awaitTermination() 

作爲一種變通方法我一直在改變我小組ID,但這真的是跛腳。我知道這是預期的行爲,不應該發生,我只需要知道如何讓流再次運行。任何幫助,將不勝感激。

回答

0

嘗試

auto.offset.reset =最新

或者

auto.offset.reset =最早

最早的:自動復位偏移,偏移最早

最新:自動重置偏移到最新的偏移量

none:如果消費者的組中沒有發現以前的偏移量,則向用戶拋出異常

其他:向用戶拋出異常。

還有一件事會影響偏移值對應最小和最大的配置是日誌保留策略。想象一下你有一個保留時間配置爲1小時的話題。您生成10條消息,然後一小時後發佈10條消息。最大的偏移量仍然保持不變,但最小的將不能爲0,因爲Kafka已經刪除了這些消息,因此最小的可用偏移量將爲10.

+0

我剛開始試過並且很困惑,直到我讀了KafkaUtils類將這個參數消隱,因爲他們認爲你太無知,無法使用它: 17/10/06 15:03:55 WARN KafkaUtils:覆蓋auto.offset.reset爲無執行者 – absmiths