2015-12-02 136 views
2

我與Eclipse 1.2.2庫拉打轉轉,WSO2 DAS 3.0.0和ActiveMQ的5.12.1做一些在物聯網的世界裏進行試驗。到目前爲止,我設法安裝DAS作爲M2M中間件服務器,庫拉對樹莓PI2作爲物聯網網關和ActiveMQ的作爲MQTT服務器。WSO2 - DAS消費MQTT消息

我也寫一個非常基本的MQTT消息生產者週期性地發送一個非常簡單的MQTT消息給MQTT服務器,以模擬在實際設備發送MQTT消息。這個想法是用一個BlueTooth設備定期發送數據來取代這個應用程序。

當我使用MQTTSpy監視收到的消息,我已經注意到了MQTT消息格式的二進制。這在文檔中清楚地陳述爲Kura在使用MQTT發送數據時使用Google協議緩衝區。由於DAS不支持這種類型的MQTT消息,我認爲這將導致服務器無法對任何進入的消息作出響應。

我配置一個DAS流使用了以下定義:

{ 
    "streamId": "mqtt_sample_01:1.0.0", 
    "name": "mqtt_sample_01", 
    "version": "1.0.0", 
    "nickName": "mqtt_sample_01", 
    "description": "mqtt_sample_01", 
    "metaData": [], 
    "correlationData": [], 
    "payloadData": [ 
    { 
     "name": "temperature", 
     "type": "FLOAT" 
    } 
    ] 
} 

我也使用以下代碼創建的呼入MQTT消息的接收器:

<?xml version="1.0" encoding="UTF-8"?> 
<eventReceiver name="mqtt_sample_receiver_protobuf" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> 
    <from eventAdapterType="mqtt-protobuf"> 
     <property name="topic">mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata</property> 
     <property name="clientId">mqtt-client-01</property> 
     <property name="url">tcp://192.168.1.42:1883</property> 
     <property name="cleanSession">false</property> 
    </from> 
    <mapping customMapping="disable" type="map"/> 
    <to streamName="mqtt_sample_01" version="1.0.0"/> 
</eventReceiver> 

注:我還試圖JSON和XML作爲映射類型。

要顯示在控制檯DAS一切,我添加使用出版商​​:

<?xml version="1.0" encoding="UTF-8"?> 
<eventPublisher name="mqtt_sample_logger_01" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> 
    <from streamName="mqtt_sample_01" version="1.0.0"/> 
    <mapping customMapping="disable" type="text"/> 
    <to eventAdapterType="logger"> 
     <property name="uniqueId">mqtt_sample_logger_01</property> 
    </to> 
</eventPublisher> 

庫拉格式使用未由WSO2-DAS理解谷歌協議緩衝區的MQTT消息。爲了解決這個問題,有幾個可能:

  1. 的MQTT消息格式可以在庫拉改變不使用谷歌協議緩衝區進行編碼。我發現了一個article on SO這或多或少類似this approach導致無論是在由CloudClient類提供的所有優點的損失。
  2. 可能性是編寫您自己的DAS接收器,如this articlethis article中所述。
  3. 第三種方法是瀏覽Kura代碼並創建CloudService/CloudClient實現的自己實現。

我個人認爲,最好的解決方案將是第二種選擇,編寫一個自定義事件接收器來理解和解碼由庫拉生成的Google協議緩衝區格式。其他的,甚至更好的解決方案也是值得歡迎的。

重要提示:
ActiveMQ的使用在GUI(MQTT-寄件人topic.mqtt-客戶01.MQTT_APP_V1.mydata)主題名稱的點符號。但是該主題的真實名稱使用/ -notation(mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata)。

,構建定製的接收器,我決定從原來的MQTT接收器複製現有的代碼,並改變它來處理protobuf的格式,並將其轉換成XML(至少是這樣的想法)。在努力正確設置所有依賴項後,我設法構建了一個工作的自定義接收器。

不幸的是,我們並不完全是我想成爲的地方。與MQTT代理的連接似乎存在問題。接收器啓動但似乎經常在日誌中寫入以下消息而失去連接。

DEBUG {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT client subscribed to : mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata 
INFO {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT Connection successful 
WARN {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT connection not reachable 
Connection lost (32109) - java.io.EOFException 
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:138) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.EOFException 
at java.io.DataInputStream.readByte(DataInputStream.java:267) 
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:56) 
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:100) 
... 1 more 

對於它的價值,經紀人(ActiveMQ的)抱怨了警告,指出:

WARN Stealing link for clientId mqtt-client-01 From Connection Transport Connection to: tcp://192.168.1.42:4594 

我的代碼絕對必須做一些錯誤導致連接到被丟棄。問題是什麼。所以,任何建議,想法,解決方案都是值得歡迎的!

提示
啓動DAS與-DosgiConsole選項讓你去調查你的部署捆綁的狀態。接收機的成功部署之後,該命令DIAG [bundle_number]應輸出類似:
的OSGi> DIAG 473
參考:文件:../的dropins/test.wso2.mqtt.receiver.MqttProtobufReceiver-> 1.0。 0.jar [473]
沒有未解決的限制。

+0

DAS接收者和庫拉發送者的客戶端ID應該有不同的值。在涉及消息類型的接收者代碼中仍然存在一個小問題,該消息類型應該被設置爲XML而不是MAP。我將重建一個新版本,並將其提供給那些可能會覺得有用的人。 – KDW

回答

0

爲WSO2產物(例如數據分析服務器)能夠通過的Eclipse庫拉(KuraPayload格式)創建處理谷歌協議緩衝器格式化的消息的一個輸入接收器的一個例子可以是downloaded at Google Drive

發送消息的庫拉示例應用程序也可以是downloaded at Google Drive

接收器接收二進制格式的KuraPayload格式並將其轉換爲XML。檢查示例應用程序的XML格式。

請分享您在接收器上做的改進/修改以幫助他人。