2017-03-06 112 views
1

我想使用kafka + spark流構建應用程序,該應用程序將接收mutilpe主題的數據。我wnat使用方法如何獲得kafka的最新偏移

def createDirectStream[ 
K: ClassTag, 
V: ClassTag, 
KD <: Decoder[K]: ClassTag, 
VD <: Decoder[V]: ClassTag, 
R: ClassTag] (
    ssc: StreamingContext, 
    kafkaParams: Map[String, String], 
    fromOffsets: Map[TopicAndPartition, Long], 
    messageHandler: MessageAndMetadata[K, V] => R 
) 

這將發出話題+的消息,但在開始的時候,我需要的fromOffsets參數傳遞給本功能。現在的問題是,我不知道這些主題的最新偏移量,我應該怎麼做才能將該偏移量傳遞給func。假設卡夫卡中沒有消息。

回答

0

如果在卡夫卡沒有信息,然後爲每個分區的偏移量爲0。如果你想開始出不偏移,你可以使用不走fromOffsets: Map[TopicAndPartition, Long]參數過載:

def createDirectStream[ 
    K: ClassTag, 
    V: ClassTag, 
    KD <: Decoder[K]: ClassTag, 
    VD <: Decoder[V]: ClassTag](
    ssc: StreamingContext, 
    kafkaParams: Map[String, String], 
    topics: Set[String] 
) 

inputDStream.transform { 
    rdd => 
    val offsets = rdd.asInstanceOf[HasOffsetRanges] 
    // Save offsets 
} 

當您啓動運行工作流,則可以通過鑄造RDDHasOffsetRanges提取偏移

相關問題