2
我使用階&使用以下火花流的方法從卡夫卡消耗數據:轉換火花卡夫卡InputDStream到數組[字節]
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
以上變量返回InputDStream通過它,我能夠看到在原始數據/二進制格式使用以下代碼: println(行)
但我需要應用avro格式(模式可用)的原始/二進制格式,以查看預期的json格式的數據。爲了應用avro格式,我需要將上面的InputDStream轉換爲avro使用的Array [Bytes]。
有人請讓我知道將InputDStream轉換爲數組[字節]?
或者
如果你知道一些更好的方式來對InputDStream申請的Avro架構(火花流的),請分享。
真棒,非常感謝!我只需要DStream的值爲Array [Byte],所以我使用下面的代碼獲取它:val行:DStream [(Array [Byte])] = KafkaUtils.createDirectStream [ String,Array [Byte],StringDecoder,DefaultDecoder]( ssc ,kafkaParams,主題).map(_._ 2) –