2015-02-06 354 views
0

我無法設法編寫一個服務器,通過WebSocket監聽STOMP消息。我的問題在於stomp協議和JMS消費者的創建。ActiveMQ,WebSocket和Stomp

下面的代碼上的createConnection

class StompDemo { 
    val uri = "ws://localhost:61614" 
    val topicName = "mytopic" 
    val broker = new BrokerService 
    broker.addConnector(uri) 
    val topic = new ActiveMQTopic(topicName) 
    val topics = Array[ActiveMQDestination](topic) 
    broker.setDestinations(topics) 
    broker.start 
    println("Started broker") 

    val connectionFactory = new ActiveMQConnectionFactory(uri) 
    val connection = connectionFactory.createConnection 
    println("Started connection") 

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
    val destination = session.createTopic(topicName) 
    val consumer = session.createConsumer(destination) 
    println("Created consumer") 

    while(true) { 
    println("Waiting for next message") 
    val message = consumer.receive 
    } 
} 

失敗,出現以下異常:

Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented! 

能否請你指出問題與此代碼? 如何使用AMQ通過WebSocket/Stomp以編程方式將JMS偵聽器配置爲隊列或主題?

感謝

新更新的代碼和ActiveMQ傳輸失敗:TCP:///127.0.0.1:51309 @ 6969】傳輸連接:TCP://127.0.0.1:51309失敗:java.io. IOException:未知數據類型:47 我想它必須與基於二進制和基於文本的相關。

仍在調查爲什麼會失敗:

package org.tj.amq 

import org.apache.activemq.broker.BrokerService 
import org.apache.activemq.ActiveMQConnectionFactory 
import javax.jms.Session 
import javax.jms.TextMessage 

// 
// http://www.massapi.com/class/br/BrokerService.html 
// 

object AMQStompDemo extends MainLoop with Logging { 
    <<("AMQ Stomp Demo") 
    val uri = "tcp://localhost:6969" 
    val broker = new BrokerService 
    broker.setPersistent(false) 
    broker.setUseJmx(false) 
    broker.addConnector(uri) 
    broker.start 
    <<("Started broker") 

    val connectionFactory = new ActiveMQConnectionFactory(uri) 
    val connection = connectionFactory.createConnection 
    connection.start 
    <<("Started connection") 
    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
    val destination = session.createQueue("test") 
    val consumer = session.createConsumer(destination) 

    while(true) { 
    <<("Ready to receive next message ...") 
    val message = consumer.receive 
    message match { 
     case tm:TextMessage => <<(s"Received text message ${tm.getText}") 
     case _ => <<(s"Received another message type $message") 
    } 
    } 

    def main(args: Array[String]): Unit = {} 
} 

trait Logging { 
    def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any") 
} 

trait MainLoop extends Logging { 
    new Thread(new Runnable() { 
    override def run = { 
     <<("Starting main loop") 
     while(true) { 
     Thread.sleep(1000) 
     } 
    } 
    }).start 
} 

的傳奇仍在繼續。 只要加上broker.addConnector("ws://localhost:6971") 我可以成功地通過WS從瀏覽器連接到隊列/隊列/測試

現在,最後剩下的問題 - 我得到回調,但AMQ給了我這個

[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException 
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException 
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314) 
    at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:215) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

收到第一條消息之後。

將帖子 嗯,我一直在遭受https://issues.apache.org/jira/browse/AMQ-5155 因此,使用AMQ版本5.9.0工程

我的感覺是AMQ for WebSockets太脆弱了。那麼可能用Tomcat代替更保守的方法。

回答

1

通常,您不會在服務器端使用網絡套接字,只需使用正常的STOMP或OpenWire連接進行連接即可。

那個siad,看着你的代碼,你似乎在使用ActiveMQ JMS客戶端,它既不會說STOMP也不會說Websockets,所以你註定要失敗。 ActiveMQ JMS客戶端使用OpenWire協議,可以通過TCP或SSL連接(HTTP可以與正確的jar一起工作)。

+0

謝謝。所以你要說的是有效地使用tcp://而不是ws://這有意義,並且只需使用web套接字lib通過正常的腳本連接,然後對它進行排序。正確? – jts 2015-02-06 11:30:42

+0

是的,您需要使用tcp://或ssl://與Java客戶端。如果你需要從瀏覽器連接,然後使用基於stomp的庫,可以做websockets,確保代理有一個ws或wss傳輸來做到這一點。 – 2015-02-06 18:45:10

+0

你好。我做了你的建議:基本上啓動了tcp:// localhost:6969的代理和連接,並使用了一個來自apache-activemq-5.11.0 \ webapps-demo \ demo \ websocket的web客戶端 - 並且當我連接到ws://localhost:6969發送消息到/ queue/test(我的代碼聽測試 - 瀏覽器告訴我哎呀!丟失連接到ws:// localhost:6969 /和服務器端tcp:///127.0.0.1: 51114 @ 6969 []傳輸連接到:tcp://127.0.0.1:51114失敗:java.io.IOException:未知數據類型:47 ..任何建議? – jts 2015-02-07 08:37:32