2016-12-16 140 views
2

我想要實現使用兔子的操作,即拒絕/延遲循環:延遲的消息循環與RabbitMQ的

我有:

  1. 主隊列與主要交易所綁定到它和DLX爲待機交換。
  2. 爲StandBy排隊待機Exchange綁定到其與60年代TTL和DLX主交換

基本上我想:

  1. 從主隊列消耗
  2. 拒絕消息(在某​​些circunstances)
  3. 將被重定向到StandBy隊列,因爲拒絕
  4. 當TTL到期時,將消息重新排隊到主隊列。

步驟1,2和3都正常,但最後一個放棄消息而不是重新排隊它。

有些理論是從的RabbitMQ的文檔我來設計,這是:從隊列

消息可以是「死字母」;即,當以下任一事件的發生重新發布到另一個交換:

  1. 該消息被拒絕,並重新排隊=假(basic.reject或basic.nack),
  2. 該消息的TTL屆滿;或
  3. 超過隊列長度限制。

...

有可能形成消息的死文字的週期。例如,當一個隊列在沒有指定死信路由密鑰的情況下將消息發送到默認交換機時,會發生這種情況。如果在整個週期內沒有拒絕,這些週期中的消息(即達到相同隊列兩次的消息)將被丟棄。

理論認爲應該重新排隊,因爲它有一個拒絕從第2步的週期,因此,你能幫助我弄清楚爲什麼它下降的消息,而不是重新排隊它?

UPDATE:

我針對的版本是2.8.4,似乎在那一刻if there was no rejections in the entire cycle是不是在使用的情況下,反正你可以檢查此您ownselves RabbitMQ 2.8.x Docs

我將接受@george答案,因爲通過此代碼可以實現原始目標。

回答

1

Rafael,我不確定你在用什麼客戶端,但是用Python中的Pika客戶端,你可以實現類似這樣的事情。爲了簡單起見,我只使用一次交換。你確定你正在設置交換機和路由鍵嗎?

sender.py

import sys 
import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters(
       'localhost')) 
channel = connection.channel() 
channel.exchange_declare(exchange='cycle', type='direct') 
channel.queue_declare(queue='standby_queue', 
         arguments={ 
          'x-message-ttl': 10000, 
          'x-dead-letter-exchange': 'cycle', 
          'x-dead-letter-routing-key': 'main_queue'}) 
channel.queue_declare(queue='main_queue', 
         arguments={ 
          'x-dead-letter-exchange': 'cycle', 
          'x-dead-letter-routing-key': 'standby_queue'}) 
channel.queue_bind(queue='main_queue', exchange='cycle') 
channel.queue_bind(queue='standby_queue', exchange='cycle') 
channel.basic_publish(exchange='cycle', 
         routing_key='main_queue', 
         body="message body") 
connection.close() 

receiver.py

import sys 
import pika 
def callback(ch, method, properties, body): 
    print "Processing message: {}".format(body) 
    # replace with condition for rejection 
    if True: 
     print "Rejecting message" 
     ch.basic_nack(method.delivery_tag, False, False) 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.basic_consume(callback, queue='main_queue') 
channel.start_consuming() 
+0

@george您好,感謝您的回覆,我使用Python和鼠兔藏漢,我有一個非常類似的代碼,但我實際上運行你的代碼的結果相同,是什麼讓我覺得,你使用的是哪個版本的RabbitMQ ?,我試過2.8.4和3.6.6版的代碼 –

+0

我會接受你的問題,但我已經嘗試過這兩個服務器和最新版本的作品。然後,我進一步研究,舊文檔沒有提到「拒絕」異常,所以我想在這種情況下,它會放棄任何嘗試刷新的消息 –