我在Spark 2.1.1上運行流式作業,輪詢Kafka 0.10。我正在使用Spark KafkaUtils類創建一個DStream,並且所有內容都正常工作,直到由於保留策略導致數據超出主題。如果任何數據超出了主題,我會停止工作做出一些更改,但我得到的錯誤表明我的偏移量超出範圍。我做了很多研究,包括查看火花源代碼,並且我看到很多評論,如本期的評論:SPARK-19680 - 基本上說數據不應該丟失 - 所以auto.offset.reset被spark忽略。但是,我的大問題是我現在可以做什麼?我的主題不會在spark中輪詢 - 它會在啓動時因偏移量異常而死亡。我不知道如何重置偏移量,這樣我的工作纔會重新開始。我沒有啓用檢查點,因爲我讀到這些使用不可靠。我曾經有很多的代碼來管理偏移,但現在看來,火花忽略請求補償,如果有任何承諾,所以我目前所管理的偏移是這樣的:來自Kafka主題的Spark Streaming拋出偏移超出範圍,無法重新啓動流
val stream = KafkaUtils.createDirectStream[String, T](
ssc,
PreferConsistent,
Subscribe[String, T](topics, kafkaParams))
stream.foreachRDD { (rdd, batchTime) =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Log.debug("processing new batch...")
val values = rdd.map(x => x.value())
val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist
consumer.processDataset(incomingFrame, batchTime)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()
作爲一種變通方法我一直在改變我小組ID,但這真的是跛腳。我知道這是預期的行爲,不應該發生,我只需要知道如何讓流再次運行。任何幫助,將不勝感激。
我剛開始試過並且很困惑,直到我讀了KafkaUtils類將這個參數消隱,因爲他們認爲你太無知,無法使用它: 17/10/06 15:03:55 WARN KafkaUtils:覆蓋auto.offset.reset爲無執行者 – absmiths