2017-02-11 64 views
0

現在我想從MQTT消息卡夫卡連接(實際上Spark在卡夫卡流)如何MQTT的有效載荷數據CONVER卡夫卡字符串類型

我用這個連接器https://github.com/evokly/kafka-connect-mqtt

和Spark-2.1。 0,卡夫卡 - 這樣

({"schema":{"type":"string","optional":false},"payload":"mqtt"},{"schema":{"type":"bytes","optional":false},"payload":"MTIzMTIz"}) 

和生產者代碼0.10.1.1

Saprk流輸出這樣

object mqttProducer { 
def main(args: Array[String]) { 
val brokerUrl = "tcp://ip" 
val topic = "mqtt" 
val msg = "123123" 

var client: MqttClient = null 

// Creating new persistence for mqtt client 
val persistence = new MqttDefaultFilePersistence("/tmp") 

try { 
    // mqtt client with specific url and client id 
    client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence) 

    client.connect() 

    val msgTopic = client.getTopic(topic) 
    val message = new MqttMessage(msg.getBytes("utf-8")) 

    while (true) { 
    msgTopic.publish(message) 
    println("Publishing Data, Topic : %s, Message : %s".format(msgTopic.getName, message)) 
    Thread.sleep(1000) 
    } 
} 

catch { 
    case e: MqttException => println("Exception Caught: " + e) 
} 

finally { 
    client.disconnect() 
} 

和火花流卡夫卡消費者代碼

package hb.test1 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.KafkaUtils 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 

object test2 { 

    def main(args: Array[String]): Unit = { 

val sparkConf = new SparkConf().setAppName("app") 
val ssc = new StreamingContext(sparkConf, Seconds(1))  


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> 
    "servers ip", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "use_a_separate_group_id_for_each_stream", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 
val topics = Array("mqtt-kafka") 
    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

val testStream = stream.map(x => (x.key, x.value)) 


testStream.print() 

ssc.start() 
ssc.awaitTermination() 
    } 
} 

我怎麼能得到的字符串不是字節? 請幫傢伙

回答

1

該有效載荷「MTIzMTIz」是字符串「123123」只是base64編碼。如果你想只取MQTT有效載荷並將其發送到卡夫卡沒有編碼應使用ByteArrayConverter的base64。在我的配置爲同一MQTT連接器I設置的值轉換器,像這樣:

「value.converter」:「io.confluent.connect.replicator.util.ByteArrayConverter」

以上ByteArrayConverter自帶匯合企業發行版,但還有其他開源的Kafka Connect ByteArrayConverters,如與qubole/streamx kafka-connect-s3連接器一起提供的。

https://github.com/qubole/streamx/blob/8b9d43008d9901b8e6020404f137944ed97522e2/src/main/java/com/qubole/streamx/ByteArrayConverter.java

有KIP-128爲標準ByteArrayConverter添加到卡夫卡連接框架

https://cwiki.apache.org/confluence/display/KAFKA/KIP-128%3A+Add+ByteArrayConverter+for+Kafka+Connect

UPDATE:卡夫卡0.11,現予以公佈,並附帶了一個ByteArrayConverter。配置"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",你應該得到不Base64編碼的原始MQTT有效載荷。