我正在使用Scala中的Spark來消費和處理卡夫卡消費者應用程序中的消息。有時,處理來自Kafka消息隊列的消息比平時花費更多的時間。那時候我需要消費最新的信息,而忽略那些已經由製片人發行但尚未消費的早期信息。Apache Kafka:如何接收來自Kafka的最新消息?
這裏是我的消費者代碼:
object KafkaSparkConsumer extends MessageProcessor {
def main(args: scala.Array[String]): Unit = {
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
val ssc = new StreamingContext(streamConf, Seconds(1))
val group_id = Random.alphanumeric.take(4).mkString("dfhSfv")
val kafkaParams = Map("metadata.broker.list" -> properties.getProperty("broker_connection_str"),
"zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> group_id,
"auto.offset.reset" -> properties.getProperty("offset_reset"),
"zookeeper.session.timeout" -> properties.getProperty("zookeeper_timeout"))
val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaParams,
Map("moved_object" -> 1),
StorageLevel.MEMORY_ONLY_SER
).map(_._2)
msgStream.foreachRDD { x =>
x.foreach {
msg => println("Message: "+msg)
processMessage(msg)
}
}
ssc.start()
ssc.awaitTermination()
}
}
有沒有什麼辦法,以確保消費者總是能夠在消費者應用程序的最新消息?或者我是否需要在卡夫卡配置中設置任何屬性以實現相同?
任何幫助,將不勝感激。謝謝
我每次開始執行消費者應用程序時都會生成隨機組ID。它以這種方式獲取最新消息,但是如果處理需要更多時間,它會繼續處理我不需要的舊消息。 – Arjun