2015-05-19 109 views
2

我試圖將我的代碼轉換爲通過皮卡發送rabbitmq消息。我很難理解如何使用異步連接發送簡單消息(如SelectConnection)。如何在python中做一個簡單的Pika SelectConnection發送消息?

在我的舊代碼,這是我使用AMQP庫我簡單地聲明這樣的類:

import amqp as amqp 

class MQ(): 

    mqConn = None 
    channel = None 

    def __init__(self): 
     self.connect() 

    def connect(self): 
     if self.mqConn is None: 
      self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False) 
      self.channel = self.mqConn.channel() 

     elif not self.mqConn.connected: 
      self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False) 
      self.channel = self.mqConn.channel() 

    def sendMQ(self, message): 
     self.connect() 
     lMessage = amqp.Message(message) 
     self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 

然後在我的代碼的其他地方我叫sendMQ(「這是我的信息」),然後代碼繼續。我不需要聽取確認等。

有人可以寫一個簡單的類,利用pika和SelectConnection,也可以使用sendMQ發送消息(「這是我的消息」)嗎?我已經看過pika的例子,但我不知道如何繞過ioloop和KeyboardInterrupt。我想我只是不知道如何讓我的代碼繼續運行沒有所有這些嘗試/ excepts ...此外,不完全確定如何通過所有回調傳遞我的消息...

任何幫助感謝!

謝謝。

回答

-2

作爲第一種方法,我建議您從這篇文章末尾提供的pub/sub示例開始。一旦你理解了這個簡單的例子,開始按照在代碼塊最後面提供的教程。該教程有6個不同的用例,以及它的python示例。通過5個第一步,您將瞭解它的工作方式。您應該清楚交換的概念(將消息路由到每個隊列的實體),綁定密鑰(用於連接交換機和隊列的密鑰),路由密鑰(與發佈者的消息一起發送的密鑰)被交換機用來將消息路由到一個隊列或另一個隊列)和隊列(一個可以存儲消息的緩衝區,可以有超過1個(或1個,如果需要的話)用戶,並且可以從超過1個交換機獲得消息並且基於不同的綁定鍵)。此外,還有不止一種類型的交換(粉絲,話題(這個可能是你需要的)...)。

如果這一切聽起來新的,請遵循的RabbitMQ提供的教程:

https://www.rabbitmq.com/tutorials/tutorial-one-python.html

pub.py:

#!/usr/bin/env python 
import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost')) 
channel = connection.channel() 

channel.queue_declare(queue='hello') 

channel.basic_publish(exchange='', 
        routing_key='hello', 
        body='Hello World!') 
print " [x] Sent 'Hello World!'" 
connection.close() 

sub.py:

#!/usr/bin/env python 
import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost')) 
channel = connection.channel() 

channel.queue_declare(queue='hello') 

print ' [*] Waiting for messages. To exit press CTRL+C' 

def callback(ch, method, properties, body): 
    print " [x] Received %r" % (body,) 

channel.basic_consume(callback, 
        queue='hello', 
        no_ack=True) 

channel.start_consuming() 
+1

之大,你試圖幫忙,但他詢問了SelectConnection適配器。你提到的例子是使用BlockingConnection。 – eandersson

+0

具有SelectConnection的示例代碼將更具說明性。 – mprat