1

我正在使用Scala中的Spark來消費和處理卡夫卡消費者應用程序中的消息。有時,處理來自Kafka消息隊列的消息比平時花費更多的時間。那時候我需要消費最新的信息,而忽略那些已經由製片人發行但尚未消費的早期信息。Apache Kafka:如何接收來自Kafka的最新消息?

這裏是我的消費者代碼:

object KafkaSparkConsumer extends MessageProcessor { 

def main(args: scala.Array[String]): Unit = { 
    val properties = readProperties() 

    val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream") 
    val ssc = new StreamingContext(streamConf, Seconds(1)) 

    val group_id = Random.alphanumeric.take(4).mkString("dfhSfv") 
    val kafkaParams = Map("metadata.broker.list"   -> properties.getProperty("broker_connection_str"), 
         "zookeeper.connect"    -> properties.getProperty("zookeeper_connection_str"), 
         "group.id"      -> group_id, 
         "auto.offset.reset"    -> properties.getProperty("offset_reset"), 
         "zookeeper.session.timeout"  -> properties.getProperty("zookeeper_timeout")) 

    val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
         ssc, 
         kafkaParams, 
         Map("moved_object" -> 1), 
         StorageLevel.MEMORY_ONLY_SER 
        ).map(_._2) 

    msgStream.foreachRDD { x => 
    x.foreach { 
     msg => println("Message: "+msg) 
     processMessage(msg) 
    }  
    }       
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

有沒有什麼辦法,以確保消費者總是能夠在消費者應用程序的最新消息?或者我是否需要在卡夫卡配置中設置任何屬性以實現相同?

任何幫助,將不勝感激。謝謝

回答

0

連接到Kafka時,您可以隨時生成一個新的(隨機)組標識 - 這樣您在連接時就會開始消費新的消息。

+0

我每次開始執行消費者應用程序時都會生成隨機組ID。它以這種方式獲取最新消息,但是如果處理需要更多時間,它會繼續處理我不需要的舊消息。 – Arjun

2

卡夫卡消費者API包括方法

void seekToEnd(Collection<TopicPartition> partitions) 

從消費所以,你可以得到分配的分區,並尋求他們全部結束。有類似的方法seekToBeginning。

+0

我已經在問題描述中發佈了我的代碼。你可以看看它,並建議我在哪裏添加此方法?謝謝你的回答:) – Arjun

+0

你對auto.offset.reset有什麼價值? – Natalia

+0

它已被設置爲'最大',自動將該值重置爲最大偏移量。 – Arjun

0

您可以享受到兩個KafkaConsumer API從分區得到的最後消息(假設日誌壓縮將不再是一個問題):

  1. public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions):這給了你到底給定分區的偏移。請注意,結束偏移量是下一個要發送的消息的偏移量。
  2. public void seek(TopicPartition partition, long offset):對每個分區運行此操作,並提供從上面調用減去1(假設它大於0)的結束偏移量。
+0

我已經在問題描述中發佈了我的代碼。你可以看看它,並建議我在哪裏添加此方法?謝謝你的回答:) – Arjun

0

是的,您可以將staringOffset設置爲最近使用最新消息。

val spark = SparkSession 
    .builder 
    .appName("kafka-reading") 
    .getOrCreate() 

import spark.implicits._ 
val df = spark 
     .readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "localhost:9092") 
     .option("startingOffsets", "latest") 
     .option("subscribe", topicName) 
     .load() 
+0

我已經在問題描述中發佈了我的代碼。你可以看看它,並建議我在哪裏添加你的代碼?謝謝你回答:) – Arjun

+0

當你定義kafkaParams時,你需要添加這個屬性。 consumer.forcefromstart = false有關更多信息,您可以看到消費者屬性。 https://github.com/dibbhatt/kafka-spark-consumer –

+0

好的Mahesh。我需要幾天時間才能查看。我會盡快給您回覆。再次感謝你。 – Arjun