我與Eclipse 1.2.2庫拉打轉轉,WSO2 DAS 3.0.0和ActiveMQ的5.12.1做一些在物聯網的世界裏進行試驗。到目前爲止,我設法安裝DAS作爲M2M中間件服務器,庫拉對樹莓PI2作爲物聯網網關和ActiveMQ的作爲MQTT服務器。WSO2 - DAS消費MQTT消息
我也寫一個非常基本的MQTT消息生產者週期性地發送一個非常簡單的MQTT消息給MQTT服務器,以模擬在實際設備發送MQTT消息。這個想法是用一個BlueTooth設備定期發送數據來取代這個應用程序。
當我使用MQTTSpy監視收到的消息,我已經注意到了MQTT消息格式的二進制。這在文檔中清楚地陳述爲Kura在使用MQTT發送數據時使用Google協議緩衝區。由於DAS不支持這種類型的MQTT消息,我認爲這將導致服務器無法對任何進入的消息作出響應。
我配置一個DAS流使用了以下定義:
{
"streamId": "mqtt_sample_01:1.0.0",
"name": "mqtt_sample_01",
"version": "1.0.0",
"nickName": "mqtt_sample_01",
"description": "mqtt_sample_01",
"metaData": [],
"correlationData": [],
"payloadData": [
{
"name": "temperature",
"type": "FLOAT"
}
]
}
我也使用以下代碼創建的呼入MQTT消息的接收器:
<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="mqtt_sample_receiver_protobuf" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="mqtt-protobuf">
<property name="topic">mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata</property>
<property name="clientId">mqtt-client-01</property>
<property name="url">tcp://192.168.1.42:1883</property>
<property name="cleanSession">false</property>
</from>
<mapping customMapping="disable" type="map"/>
<to streamName="mqtt_sample_01" version="1.0.0"/>
</eventReceiver>
注:我還試圖JSON和XML作爲映射類型。
要顯示在控制檯DAS一切,我添加使用出版商:
<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="mqtt_sample_logger_01" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
<from streamName="mqtt_sample_01" version="1.0.0"/>
<mapping customMapping="disable" type="text"/>
<to eventAdapterType="logger">
<property name="uniqueId">mqtt_sample_logger_01</property>
</to>
</eventPublisher>
庫拉格式使用未由WSO2-DAS理解谷歌協議緩衝區的MQTT消息。爲了解決這個問題,有幾個可能:
- 的MQTT消息格式可以在庫拉改變不使用谷歌協議緩衝區進行編碼。我發現了一個article on SO這或多或少類似this approach導致無論是在由CloudClient類提供的所有優點的損失。
- 可能性是編寫您自己的DAS接收器,如this article或this article中所述。
- 第三種方法是瀏覽Kura代碼並創建CloudService/CloudClient實現的自己實現。
我個人認爲,最好的解決方案將是第二種選擇,編寫一個自定義事件接收器來理解和解碼由庫拉生成的Google協議緩衝區格式。其他的,甚至更好的解決方案也是值得歡迎的。
重要提示:
ActiveMQ的使用在GUI(MQTT-寄件人topic.mqtt-客戶01.MQTT_APP_V1.mydata)主題名稱的點符號。但是該主題的真實名稱使用/ -notation(mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata)。
,構建定製的接收器,我決定從原來的MQTT接收器複製現有的代碼,並改變它來處理protobuf的格式,並將其轉換成XML(至少是這樣的想法)。在努力正確設置所有依賴項後,我設法構建了一個工作的自定義接收器。
不幸的是,我們並不完全是我想成爲的地方。與MQTT代理的連接似乎存在問題。接收器啓動但似乎經常在日誌中寫入以下消息而失去連接。
DEBUG {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT client subscribed to : mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata
INFO {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT Connection successful
WARN {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT connection not reachable
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:138)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:56)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:100)
... 1 more
對於它的價值,經紀人(ActiveMQ的)抱怨了警告,指出:
WARN Stealing link for clientId mqtt-client-01 From Connection Transport Connection to: tcp://192.168.1.42:4594
我的代碼絕對必須做一些錯誤導致連接到被丟棄。問題是什麼。所以,任何建議,想法,解決方案都是值得歡迎的!
提示:
啓動DAS與-DosgiConsole選項讓你去調查你的部署捆綁的狀態。接收機的成功部署之後,該命令DIAG [bundle_number]應輸出類似:
的OSGi> DIAG 473
參考:文件:../的dropins/test.wso2.mqtt.receiver.MqttProtobufReceiver-> 1.0。 0.jar [473]
沒有未解決的限制。
DAS接收者和庫拉發送者的客戶端ID應該有不同的值。在涉及消息類型的接收者代碼中仍然存在一個小問題,該消息類型應該被設置爲XML而不是MAP。我將重建一個新版本,並將其提供給那些可能會覺得有用的人。 – KDW