5

我想用火花流讀取來自卡夫卡的舊信息。但是,我只能夠在實時發送郵件時檢索郵件(即,如果我填充新郵件,而我的Spark程序正在運行 - 則會收到這些郵件)。apache spark streaming - kafka - 閱讀舊信息

我正在改變我的groupID和consumerID,以確保zookeeper不只是不給它知道我的程序已經見過的消息。

假設spark在zookeeper中看到偏移量爲-1,它不應該讀取隊列中的所有舊消息嗎?我只是誤解了卡夫卡隊列的使用方式嗎?我對火花和卡夫卡非常陌生,所以我不能排除我只是誤解了一些東西。

package com.kibblesandbits 

import org.apache.spark.SparkContext 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.kafka.KafkaUtils 

import net.liftweb.json._ 

object KafkaStreamingTest { 

    val cfg = new ConfigLoader().load 
    val zookeeperHost = cfg.zookeeper.host 
    val zookeeperPort = cfg.zookeeper.port 
    val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot 

    implicit val formats = DefaultFormats 

    def parser(json: String): String = { 
    return json 
} 

def main(args : Array[String]) { 
    val zkQuorum = "test-spark02:9092" 

    val group = "myGroup99" 
    val topic = Map("testtopic" -> 1) 
    val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New") 
    val ssc = new StreamingContext(sparkContext, Seconds(3)) 
    val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic) 
    var gp = json_stream.map(_._2).map(parser) 

    gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json") 
    ssc.start() 
} 

運行此時,我會看到以下消息。所以我相信它不僅僅是沒有看到消息,因爲偏移量已經設置好了。

14/12/05 13時34分08秒INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047]增加分區提取器 ArrayBuffer([[testtopic,0],initOffset -1促成 ID:1,主機:test-spark02.vpc,port:9092],[[testtopic,1], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092], [[testtopic,2], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092],[[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port: 9092], [[testtopic,4],initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092])

然後,如果我填充1000條新消息 - 我可以看到這1000條消息保存在我的臨時目錄中。但我不知道如何讀取現有的消息,這些消息應該在(現在)成千上萬的數字中編號。

回答

8

使用上KafkaUtils替代工廠方法,讓你提供一個配置到卡夫卡消費者:

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
     ssc: StreamingContext, 
     kafkaParams: Map[String, String], 
     topics: Map[String, Int], 
     storageLevel: StorageLevel 
    ): ReceiverInputDStream[(K, V)] 

然後用你的卡夫卡的配置,在地圖和添加參數「kafka.auto.offset.reset」設置爲'最小':

val kafkaParams = Map[String, String](
     "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "kafka.auto.offset.reset" -> "smallest" 
) 

將該配置提供給上述工廠方法。 「kafka.auto.offset.reset」 - >「最小」告訴消費者從您話題中的最小偏移量開始。

+0

仍然沒有爲我工作,是否有任何其他可能的方式來做到這一點?我在主題中有10k條消息,但只有當我收到新消息時才能檢索到它們。如何獲取已存儲在kafka主題中的數據? – 2017-02-02 07:31:20

+0

「auto.offset.reset」 - >「最小」爲我工作。另外,根據文檔,https://cwiki.apache.org/confluence/display/KAFKA/FAQ,如果您使用0.9版本,它應該是「最早的」 – Evgenii 2017-11-29 10:35:56

相關問題