我還沒有找到任何解決方案來解決我的問題。我需要用兩個線程創建一個python應用程序,每個線程都使用高速公路庫連接到WAMP路由器。如何用兩個線程創建一個具有高速應用程序的Python應用程序
關注我寫我的實驗代碼:
wampAddress = 'ws://172.17.3.139:8181/ws'
wampRealm = 's4t'
from threading import Thread
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks
class AutobahnMRS(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Sessio attached [Connect to WAMP Router]")
def onMessage(*args):
print args
try:
yield self.subscribe(onMessage, 'test')
print ("Subscribed to topic: test")
except Exception as e:
print("Exception:" +e)
class AutobahnIM(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Sessio attached [Connect to WAMP Router]")
try:
yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO')
print ("Subscribed to topic: test")
except Exception as e:
print("Exception:" +e)
class ManageRemoteSystem:
def __init__(self):
self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)
def start(self):
self.runner.run(AutobahnMRS);
class InternalMessages:
def __init__(self):
self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)
def start(self):
self.runner.run(AutobahnIM);
#class S4tServer:
if __name__ == '__main__':
server = ManageRemoteSystem()
sendMessage = InternalMessages()
thread1 = Thread(target = server.start())
thread1.start()
thread1.join()
thread2 = Thread(target = sendMessage.start())
thread2.start()
thread2.join()
當我只啓動了線程1時開始,後來當我殺的應用程序(CTRL-C)以下錯誤消息顯示這條巨蟒的應用:
Sessio attached [Connect to WAMP Router]
Subscribed to topic: test
^CTraceback (most recent call last):
File "test_pub.py", line 71, in <module>
p2 = multiprocessing.Process(target = server.start())
File "test_pub.py", line 50, in start
self.runner.run(AutobahnMRS);
File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run
reactor.run()
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run
self.startRunning(installSignalHandlers=installSignalHandlers)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning
ReactorBase.startRunning(self)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning
raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable
我需要有它的功能一個應用程序來實現,也必須有傳達給WAMP路由器高速公路Python庫的系統。
換句話說,我需要一個能夠與WAMP路由器通信的解決方案,但同時這個應用程序不必被高速公路部分阻塞(我認爲解決方案是啓動兩個線程,一個線程管理一些功能,第二個線程管理高速公路部分)。
由於我之前提出的模式存在另一個問題,需要在'no autobahn thread'中的應用程序部分發送消息(在WAMP路由器的特定主題中),應該通過具有特定功能而不會阻礙其他功能。
我希望我已經給出了所有的細節。
非常感謝您的回覆
--------------------------------編輯--- ------------------------------
經過一番研究,我已經實現了我需要的websocket協議,代碼是如下:
# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
def __init__(self, factory):
self.factory = factory
def onOpen(self):
#log.debug("Client connected")
self.factory.protocol_instance = self
self.factory.base_client._connected_event.set()
class _WebSocketClientFactory(WebSocketClientFactory):
def __init__(self, *args, **kwargs):
WebSocketClientFactory.__init__(self, *args, **kwargs)
self.protocol_instance = None
self.base_client = None
def buildProtocol(self, addr):
return _WebSocketClientProtocol(self)
# ------ end twisted -------
lass BaseWBClient(object):
def __init__(self, websocket_settings):
#self.settings = websocket_settings
# instance to be set by the own factory
self.factory = None
# this event will be triggered on onOpen()
self._connected_event = threading.Event()
# queue to hold not yet dispatched messages
self._send_queue = Queue.Queue()
self._reactor_thread = None
def connect(self):
log.msg("Connecting to host:port")
self.factory = _WebSocketClientFactory(
"ws://host:port",
debug=True)
self.factory.base_client = self
c = connectWS(self.factory)
self._reactor_thread = threading.Thread(target=reactor.run,
args=(False,))
self._reactor_thread.daemon = True
self._reactor_thread.start()
def send_message(self, body):
if not self._check_connection():
return
log.msg("Queing send")
self._send_queue.put(body)
reactor.callFromThread(self._dispatch)
def _check_connection(self):
if not self._connected_event.wait(timeout=10):
log.err("Unable to connect to server")
self.close()
return False
return True
def _dispatch(self):
log.msg("Dispatching")
while True:
try:
body = self._send_queue.get(block=False)
except Queue.Empty:
break
self.factory.protocol_instance.sendMessage(body)
def close(self):
reactor.callFromThread(reactor.stop)
import time
def Ppippo(coda):
while True:
coda.send_message('YOOOOOOOO')
time.sleep(5)
if __name__ == '__main__':
ws_setting = {'host':'', 'port':}
client = BaseWBClient(ws_setting)
t1 = threading.Thread(client.connect())
t11 = threading.Thread(Ppippo(client))
t11.start()
t1.start()
以前的代碼工作正常,但我需要翻譯此操作WAMP協議insted websocket。
有誰知道我如何解決?
移動'thread1.join()'下拉至與'thread2.join() '。在當前位置,它會告訴主線程等待,直到線程1死亡。由於你無法殺死線程(不用Ctrl-C殺死整個進程),所以第二個線程永遠不會被創建。 – 2015-02-09 16:51:04
另外,你的線程應該在線程的'.run()'函數中完成他們的工作。你在主函數的末尾加入()一個線程,以允許線程在主應用程序退出之前完成執行。所以你需要讓線程完成任務。 – 2015-02-09 16:56:44
你需要2個應用程序會話,還是真的有2個線程?如果前者是同一個路由器/領域或不同的路由器?如果是後者,爲什麼你首先需要線程?如果你需要做CPU密集的東西,並希望使用多核心,請讓我們知道。需要更多的「爲什麼」和「什麼」... – oberstet 2015-02-09 19:54:32