我是rabbitmq的新手,並試圖弄清楚如何通過本教程(https://www.rabbitmq.com/tutorials/tutorial-six-python.html)向客戶端請求一個有關內存和CPU利用率信息的服務器。Rabbitmq與Pika遠程通話
因此,客戶端請求CPU和內存(我相信我需要兩個隊列),服務器以這些值作爲響應。
無論如何,簡單地創建一個client.py
和server.py
在這種情況下使用Python中的Pika庫。
我是rabbitmq的新手,並試圖弄清楚如何通過本教程(https://www.rabbitmq.com/tutorials/tutorial-six-python.html)向客戶端請求一個有關內存和CPU利用率信息的服務器。Rabbitmq與Pika遠程通話
因此,客戶端請求CPU和內存(我相信我需要兩個隊列),服務器以這些值作爲響應。
無論如何,簡單地創建一個client.py
和server.py
在這種情況下使用Python中的Pika庫。
如果您還沒有,我會建議您按照第一個RabbitMQ tutorials。 RPC示例建立在前面示例涵蓋的概念上(直接隊列,專用隊列,確認等)。
提出了本教程的RPC解決方案需要至少兩個隊列,這取決於你想要多少客戶使用:
rpc_queue
),用於發送從客戶端向服務器請求。的請求/響應循環:
rpc_queue
。每條消息包括一個reply_to
屬性,其中服務器應該回復的客戶端專用隊列的名稱以及correlation_id
屬性(僅用於跟蹤請求的唯一ID)。rpc_queue
上的消息。當消息到達時,它準備響應,將correlation_id
添加到新消息,並將其發送到reply_to
消息屬性中定義的隊列。correlation_id
消息。直接跳到你的問題,首先要做的是定義你想要在你的響應中使用的消息格式。您可以使用JSON,msgpack或任何其他序列化庫。例如,如果使用JSON,一個消息可能是這個樣子:您server.py
{
"cpu": 1.2,
"memory": 0.3
}
然後:
def on_request(channel, method, props, body):
response = {'cpu': current_cpu_usage(),
'memory': current_memory_usage()}
properties = pika.BasicProperties(correlation_id=props.correlation_id)
channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=properties,
body=json.dumps(response))
channel.basic_ack(delivery_tag=method.delivery_tag)
# ...
並在您client.py
:
class ResponseTimeout(Exception): pass
class Client:
# similar constructor as `FibonacciRpcClient` from tutorial...
def on_response(self, channel, method, props, body):
if self.correlation_id == props.correlation_id:
self.response = json.loads(body.decode())
def call(self, timeout=2):
self.response = None
self.correlation_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.correlation_id),
body='')
start_time = time.time()
while self.response is None:
if (start_time + timeout) < time.time():
raise ResponseTimeout()
self.connection.process_data_events()
return self.response
正如你看到的,代碼幾乎與原來的FibonacciRpcClient
相同。主要區別如下:
call()
方法不需要body
參數(沒有什麼可以發送到服務器)儘管如此,還有就是很多東西在這裏提升:
reply_to
隊列,我們的服務器是要去崩潰,並在重新啓動時再次崩潰(壞消息會可以,因爲它不是由我們的服務器確認)您也可以考慮更換RPC方法無限,只要重新排隊用發佈/訂閱模式;這樣,服務器每隔X個時間間隔簡單地廣播其CPU /內存狀態,並且一個或多個客戶端接收更新。