2016-03-04 74 views

回答

1

如果您還沒有,我會建議您按照第一個RabbitMQ tutorials。 RPC示例建立在前面示例涵蓋的概念上(直接隊列,專用隊列,確認等)。

提出了本教程的RPC解決方案需要至少兩個隊列,這取決於你想要多少客戶使用:

  • 一個直接隊列(rpc_queue),用於發送從客戶端向服務器請求。
  • 每個客戶端有一個獨佔隊列,用於接收響應。

的請求/響應循環:

  • 客戶端將消息發送到所述rpc_queue。每條消息包括一個reply_to屬性,其中服務器應該回復的客戶端專用隊列的名稱以及correlation_id屬性(僅用於跟蹤請求的唯一ID)。
  • 服務器等待rpc_queue上的消息。當消息到達時,它準備響應,將correlation_id添加到新消息,並將其發送到reply_to消息屬性中定義的隊列。
  • 客戶端在其獨佔隊列中等待,直到它找到最初生成的correlation_id消息。

直接跳到你的問題,首先要做的是定義你想要在你的響應中使用的消息格式。您可以使用JSON,msgpack或任何其他序列化庫。例如,如果使用JSON,一個消息可能是這個樣子:您server.py

{ 
    "cpu": 1.2, 
    "memory": 0.3 
} 

然後:

def on_request(channel, method, props, body): 
    response = {'cpu': current_cpu_usage(), 
       'memory': current_memory_usage()} 
    properties = pika.BasicProperties(correlation_id=props.correlation_id) 

    channel.basic_publish(exchange='', 
          routing_key=props.reply_to, 
          properties=properties, 
          body=json.dumps(response)) 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

# ... 

並在您client.py

class ResponseTimeout(Exception): pass 

class Client: 
    # similar constructor as `FibonacciRpcClient` from tutorial... 

    def on_response(self, channel, method, props, body): 
     if self.correlation_id == props.correlation_id: 
      self.response = json.loads(body.decode()) 

    def call(self, timeout=2): 
     self.response = None 
     self.correlation_id = str(uuid.uuid4()) 
     self.channel.basic_publish(exchange='', 
            routing_key='rpc_queue', 
            properties=pika.BasicProperties(
             reply_to=self.callback_queue, 
             correlation_id=self.correlation_id), 
            body='') 

     start_time = time.time() 
     while self.response is None: 
      if (start_time + timeout) < time.time(): 
       raise ResponseTimeout() 
      self.connection.process_data_events() 
     return self.response 

正如你看到的,代碼幾乎與原來的FibonacciRpcClient相同。主要區別如下:

  • 我們使用JSON作爲我們消息的數據格式。
  • 我們的客戶call()方法不需要body參數(沒有什麼可以發送到服務器)
  • 我們照顧響應超時(如果服務器已關閉,或者如果它不回覆我們的信息)

儘管如此,還有就是很多東西在這裏提升:

  • 沒有錯誤處理:例如,如果客戶端「忘記」送reply_to隊列,我們​​的服務器是要去崩潰,並在重新啓動時再次崩潰(壞消息會可以,因爲它不是由我們的服務器確認)
  • 我們不處理斷開的連接(無需重新連接機制)
  • ...

您也可以考慮更換RPC方法無限,只要重新排隊用發佈/訂閱模式;這樣,服務器每隔X個時間間隔簡單地廣播其CPU /內存狀態,並且一個或多個客戶端接收更新。

相關問題