2015-11-05 224 views
2

我有一個使用單個Azure服務總線消息隊列的Java應用程序和NodeJS應用程序。互操作性Azure服務總線消息隊列消息

我跟我的客戶見證了一些奇怪的效果,如下。

JAVA消息生產者(使用每天青JMS教程QPID庫):

TextMessage message = sendSession.createTextMessage(); 
     message.setText("Test AMQP message from JMS"); 
     long randomMessageID = randomGenerator.nextLong() >>>1; 
     message.setJMSMessageID("ID:" + randomMessageID); 
     sender.send(message); 
     System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID()); 

OUTPUT:2414932965987073843

的NodeJS消息消費者:

serviceBus.receiveQueueMessage(queue, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) { 
if(message !==null)console.log(util.inspect(message, {showHidden: false, depth: null})); 
}); 

與JMSMessageID按= ID發送消息OUTPUT:

{ body: '@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/�\u001aTest AMQP message from JMS', 
brokerProperties: 
{ DeliveryCount: 1, 
EnqueuedSequenceNumber: 5000004, 
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:28:21 GMT', 
MessageId: '2414932965987073843', 
PartitionKey: '89', 
SequenceNumber: 59672695067659070, 
State: 'Active', 
TimeToLive: 1209600, 
To: 'moequeue' }, 
contentType: 'application/xml; charset=utf-8' } 

如果我比較,爲)通過serviceBus.sendQueueMessage(插入到隊列中的消息,則性質是這樣的:

{ body: 'test message', 
brokerProperties: 
{ DeliveryCount: 1, 
EnqueuedSequenceNumber: 0, 
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:44:03 GMT', 
MessageId: 'bc0a3d4f-15ba-434f-9fb0-1a3789885f8c', 
PartitionKey: '734', 
SequenceNumber: 37436171906517256, 
State: 'Active', 
TimeToLive: 1209600 }, 
contentType: 'text/plain', 
customProperties: 
{ message_number: 0, 
sent_date: Wed Nov 04 2015 21:44:03 GMT+0000 (UTC) } } 

所以內容類型是不同的下手 - 爲什麼? - 然後第一個消息有效載荷的主體中的奇怪垃圾來自哪裏:@ \ u0006string \ b3http://schemas.microsoft.com/2003/10/Serialization/ \ u001a 這是序列化?這怎麼能被緩解?

查找代碼,以及在這裏:

回答

2

Azure的服務總線支持兩種不同的協議:AMQP和HTTP。使用qpid庫的Java/JMS對ServiceBus使用AMQP協議。但是,NodeJS中包裝的ServiceBus REST API通過HTTP協議。

服務總線支持AMQP的詳細信息請參考https://azure.microsoft.com/en-us/documentation/articles/service-bus-amqp-overview/

而對於ServiceBus的REST API,請參閱https://msdn.microsoft.com/en-us/library/azure/hh780717.aspx

AMQP是二進制,應用層協議,設計用於高效 支持各種各樣的消息傳送應用程序和通信 圖案。 - from WikiPedia

但HTTP是一種文本協議。

消息格式如下,請參考節點http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-formatMessage Format節。 AMQP規範可以參考http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html

             Bare Message 
                  | 
             .---------------------+--------------------. 
             |           | 
+--------+-------------+-------------+------------+--------------+--------------+--------+ 
| header | delivery- | message- | properties | application- | application- | footer | 
|  | annotations | annotations |   | properties | data   |  | 
+--------+-------------+-------------+------------+--------------+--------------+--------+ 
|                      | 
'-------------------------------------------+--------------------------------------------' 
              | 
             Annotated Message 

所以在Java中發送或在NodeJS中發送的消息被序列化爲不同的結果。

內容\uXXXX在AMQP中形成的內容是Unicode Charater。

Unicode字符集\u0006是確認控制字符,請參考https://en.wikipedia.org/wiki/Acknowledge_character瞭解它。

而且Unicode字庫\u001a是替代控制字符,請參考https://en.wikipedia.org/wiki/Substitute_character

它們限制了消息頭中元數據的開始和結束。

+0

瞭解,那麼有沒有一種方法可以在Node中「安全」反序列化這些消息?我堅持所有客戶端的相同協議是一個好主意,但現實可能會有所不同,有效負載仍然需要可交換。 – MoB

+0

@ user3506080在發送者和接收者中使用不同的協議並不是一個好主意。但我認爲,如果真的有必要,您可以通過特殊字符分割內容來安全地獲取正確的信息。 –

+0

雖然這些信息在技術上是正確的,但它顯然是錯誤的。 AMQP是一種二進制傳輸協議,根本不會對實際的消息進行任何編碼。消息的編碼是JMS。如果將消息的類型更改爲字節消息(JmsByteMessage),則編碼將被REST服務解釋爲「application/octetstream」,而不是「application/xml」,並且您的消息將在節點中可讀。 –

0

我們遇到了完全相同的問題,儘管在使用基於Camel的生產者的一個更多的例子中。由於我們環境的變化,我們開始遇到這些問題。

這裏的問題是REST服務在編碼對節點客戶端的HTTP響應時如何解釋JMS消息。

我們發現JmsTextmessage出於某種原因,並不完全清楚,被認爲是「application/xml」類型,並且內容將按照這種方式轉發。因此你在你的例子中得到了OUTPUT。

如果改爲使用JmsByteMessage,則內容將被解釋爲「application/octet-stream」,並且不會在傳輸中發生損壞。

所以儘量沿着線的東西:

BytesMessage message = sendSession.createBytesMessage(); 
String body = "Test AMQP message from JMS"; 
message.writeBytes(body.getBytes(StandardCharsets.UTF_8)); 
sender.send(message); 

我們爲了轉移到由Node.js的客戶端解譯的JSON編碼的數據使用。