2015-02-09 50 views
3

我還沒有找到任何解決方案來解決我的問題。我需要用兩個線程創建一個python應用程序,每個線程都使用高速公路庫連接到WAMP路由器。如何用兩個線程創建一個具有高速應用程序的Python應用程序

關注我寫我的實驗代碼:

wampAddress = 'ws://172.17.3.139:8181/ws' 
wampRealm = 's4t' 

from threading import Thread 
from autobahn.twisted.wamp import ApplicationRunner 
from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet.defer import inlineCallbacks 


class AutobahnMRS(ApplicationSession): 
    @inlineCallbacks 
    def onJoin(self, details): 
     print("Sessio attached [Connect to WAMP Router]") 

     def onMessage(*args): 
      print args 
     try: 
      yield self.subscribe(onMessage, 'test') 
      print ("Subscribed to topic: test") 

     except Exception as e: 
      print("Exception:" +e) 

class AutobahnIM(ApplicationSession): 
    @inlineCallbacks 
    def onJoin(self, details): 
     print("Sessio attached [Connect to WAMP Router]") 

     try: 
      yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO') 
      print ("Subscribed to topic: test") 

     except Exception as e: 
      print("Exception:" +e) 

class ManageRemoteSystem: 
    def __init__(self): 
     self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm) 

    def start(self): 
     self.runner.run(AutobahnMRS); 


class InternalMessages: 
    def __init__(self): 
     self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm) 

    def start(self): 
     self.runner.run(AutobahnIM); 

#class S4tServer: 

if __name__ == '__main__': 
    server = ManageRemoteSystem() 
    sendMessage = InternalMessages() 

    thread1 = Thread(target = server.start()) 
    thread1.start() 
    thread1.join() 

    thread2 = Thread(target = sendMessage.start()) 
    thread2.start() 
    thread2.join() 

當我只啓動了線程1時開始,後來當我殺的應用程序(CTRL-C)以下錯誤消息顯示這條巨蟒的應用:

Sessio attached [Connect to WAMP Router] 
Subscribed to topic: test 
^CTraceback (most recent call last): 
    File "test_pub.py", line 71, in <module> 
    p2 = multiprocessing.Process(target = server.start()) 
    File "test_pub.py", line 50, in start 
    self.runner.run(AutobahnMRS); 
    File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run 
    reactor.run() 
    File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run 
    self.startRunning(installSignalHandlers=installSignalHandlers) 
    File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning 
    ReactorBase.startRunning(self) 
    File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning 
    raise error.ReactorNotRestartable() 
twisted.internet.error.ReactorNotRestartable 

我需要有它的功能一個應用程序來實現,也必須有傳達給WAMP路由器高速公路Python庫的系統。

換句話說,我需要一個能夠與WAMP路由器通信的解決方案,但同時這個應用程序不必被高速公路部分阻塞(我認爲解決方案是啓動兩個線程,一個線程管理一些功能,第二個線程管理高速公路部分)。

由於我之前提出的模式存在另一個問題,需要在'no autobahn thread'中的應用程序部分發送消息(在WAMP路由器的特定主題中),應該通過具有特定功能而不會阻礙其他功能。

我希望我已經給出了所有的細節。

非常感謝您的回覆

--------------------------------編輯--- ------------------------------

經過一番研究,我已經實現了我需要的websocket協議,代碼是如下:

# ----- twisted ---------- 
class _WebSocketClientProtocol(WebSocketClientProtocol): 
    def __init__(self, factory): 
     self.factory = factory 

    def onOpen(self): 
     #log.debug("Client connected") 
     self.factory.protocol_instance = self 
     self.factory.base_client._connected_event.set() 

class _WebSocketClientFactory(WebSocketClientFactory): 
    def __init__(self, *args, **kwargs): 
     WebSocketClientFactory.__init__(self, *args, **kwargs) 
     self.protocol_instance = None 
     self.base_client = None 

    def buildProtocol(self, addr): 
     return _WebSocketClientProtocol(self) 
# ------ end twisted ------- 
lass BaseWBClient(object): 

    def __init__(self, websocket_settings): 
     #self.settings = websocket_settings 
     # instance to be set by the own factory 
     self.factory = None 
     # this event will be triggered on onOpen() 
     self._connected_event = threading.Event() 
     # queue to hold not yet dispatched messages 
     self._send_queue = Queue.Queue() 
     self._reactor_thread = None 

    def connect(self): 

     log.msg("Connecting to host:port") 
     self.factory = _WebSocketClientFactory(
           "ws://host:port", 
           debug=True) 
     self.factory.base_client = self 

     c = connectWS(self.factory) 

     self._reactor_thread = threading.Thread(target=reactor.run, 
               args=(False,)) 
     self._reactor_thread.daemon = True 
     self._reactor_thread.start() 

    def send_message(self, body): 
     if not self._check_connection(): 
      return 
     log.msg("Queing send") 
     self._send_queue.put(body) 
     reactor.callFromThread(self._dispatch) 

    def _check_connection(self): 
     if not self._connected_event.wait(timeout=10): 
      log.err("Unable to connect to server") 
      self.close() 
      return False 
     return True 

    def _dispatch(self): 
     log.msg("Dispatching") 
     while True: 
      try: 
       body = self._send_queue.get(block=False) 
      except Queue.Empty: 
       break 
      self.factory.protocol_instance.sendMessage(body) 

    def close(self): 
     reactor.callFromThread(reactor.stop) 

import time 
def Ppippo(coda): 
     while True: 
      coda.send_message('YOOOOOOOO') 
      time.sleep(5) 

if __name__ == '__main__': 

    ws_setting = {'host':'', 'port':} 

    client = BaseWBClient(ws_setting) 

    t1 = threading.Thread(client.connect()) 
    t11 = threading.Thread(Ppippo(client)) 
    t11.start() 
    t1.start() 

以前的代碼工作正常,但我需要翻譯此操作WAMP協議insted websocket。

有誰知道我如何解決?

+0

移動'thread1.join()'下拉至與'thread2.join() '。在當前位置,它會告訴主線程等待,直到線程1死亡。由於你無法殺死線程(不用Ctrl-C殺死整個進程),所以第二個線程永遠不會被創建。 – 2015-02-09 16:51:04

+0

另外,你的線程應該在線程的'.run()'函數中完成他們的工作。你在主函數的末尾加入()一個線程,以允許線程在主應用程序退出之前完成執行。所以你需要讓線程完成任務。 – 2015-02-09 16:56:44

+0

你需要2個應用程序會話,還是真的有2個線程?如果前者是同一個路由器/領域或不同的路由器?如果是後者,爲什麼你首先需要線程?如果你需要做CPU密集的東西,並希望使用多核心,請讓我們知道。需要更多的「爲什麼」和「什麼」... – oberstet 2015-02-09 19:54:32

回答

5

壞消息是Autobahn正在使用Twisted主循環,所以你不能一次在兩個線程中運行它。

好消息是,你不需要需要在兩個線程中運行它來運行兩件事情,而且無論如何兩個線程會更復雜。

開始使用多個應用程序的API有點令人困惑,因爲您有兩個ApplicationRunner對象,乍看之下,您在高速公路上運行應用程序的方式是致電ApplicationRunner.run

但是,ApplicationRunner只是一種方便,它包裝了設置應用程序的東西以及運行主循環的東西;工作的真正肉發生在WampWebSocketClientFactory

爲了實現你想要的,你只需要擺脫線程,並自己運行主循環,使實例簡單地設置他們的應用程序。

爲了實現這一點,你需要改變你的程序做這個的最後一部分:

class ManageRemoteSystem: 
    def __init__(self): 
     self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm) 

    def start(self): 
     # Pass start_reactor=False to all runner.run() calls 
     self.runner.run(AutobahnMRS, start_reactor=False) 


class InternalMessages: 
    def __init__(self): 
     self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm) 

    def start(self): 
     # Same as above 
     self.runner.run(AutobahnIM, start_reactor=False) 


if __name__ == '__main__': 
    server = ManageRemoteSystem() 
    sendMessage = InternalMessages() 
    server.start() 
    sendMessage.start() 

    from twisted.internet import reactor 
    reactor.run() 
+1

這個API有點吸引人,尤其是這個用例。我們在另一個repo中有未發佈的東西,它允許通過單個調用啓動多個會話,並返回一個「DeferredList」(可以用單獨的WAMP應用會話來解決)。可能這應該在Autobahn .. – oberstet 2015-02-09 19:55:48

+0

對不起,能不能更準確地說明這一點 – alotronto 2015-02-23 15:30:18

+1

謝謝@Glyph !!!我一直在尋找這個'start_reactor'參數的年齡,但似乎在文檔中沒有提及它......或者其他人應該如何將autobahn添加到現有的Twisted應用程序中? – jjmontes 2015-09-10 21:37:00

相關問題