2017-02-13 84 views
2

我使用階&使用以下火花流的方法從卡夫卡消耗數據:轉換火花卡夫卡InputDStream到數組[字節]

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2) 

以上變量返回InputDStream通過它,我能夠看到在原始數據/二進制格式使用以下代碼: println(行)

但我需要應用avro格式(模式可用)的原始/二進制格式,以查看預期的json格式的數據。爲了應用avro格式,我需要將上面的InputDStream轉換爲avro使用的Array [Bytes]。

有人請讓我知道將InputDStream轉換爲數組[字節]?

或者

如果你知道一些更好的方式來對InputDStream申請的Avro架構(火花流的),請分享。

回答

2

你需要做兩件事。第一種是使用DefaultDecoder卡夫卡,讓你的價值類型的Array[Byte]

val lines: DStream[(String, Array[Byte])] = 
    KafkaUtils 
    .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics) 

然後你需要通過附加map以應用Avro的反序列化的邏輯:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) } 

哪裏avroDeserializer是你知道如何從Avro字節創建你的類型的任意一類。

我個人使用avro4s通過宏來獲取case類的反序列化。

+0

真棒,非常感謝!我只需要DStream的值爲Array [Byte],所以我使用下面的代碼獲取它:val行:DStream [(Array [Byte])] = KafkaUtils.createDirectStream [ String,Array [Byte],StringDecoder,DefaultDecoder]( ssc ,kafkaParams,主題).map(_._ 2) –