2016-06-08 94 views
5
def get_connection_and_channel(self, connection_parameters): 
    connection = pika.BlockingConnection(connection_parameters) 
    channel = connection.channel() 
    return (connection, channel) 


connection_parameters = pika.ConnectionParameters(server, port, virtual_host, credentials=pika.PlainCredentials(user_name, password)) 

connection,channel = self.get_connection_and_channel(connection_parameters) 

channel.confirm_delivery() 
count=0 
for json_string in open(json_file, 'r'): 
    result_json = json.loads(json_string) 
    message_body = json.dumps(result_json['body']) 
    routing_key = result_json['RoutingKey'] 
    channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip()) 
    count += 1 
self.logger.info('Sent %d messages' % count) 
connection.close() 

我正在使用此代碼向RabbitMQ服務器發送郵件。但偶爾,這不會將所有消息發送到相應的隊列。它每次運行時都會丟失任意數量的消息。RabbitMq - pika - python - 發佈時刪除郵件

我不明白這裏有什麼問題。

回答

2

由於您的郵件無法將郵件路由到任何現有隊列,因此可能會返回您的郵件。嘗試在channel.confirm_delivery添加的回調:

channel.confirm_delivery(on_delivery_confirmation) 

def on_delivery_confirmation(self, method_frame): 
     confirmation_type = method_frame.method.NAME.split('.')[1].lower()    
     if confirmation_type == 'ack': 
      self.logger.info('message published') 
     elif confirmation_type == 'nack': 
      self.logger.info('message not routed') 

如果是這種情況,那麼建議先結合消費者隊列與交換和發佈消息之前路由鍵。

1

簡單(不可靠)方式

首先,啓用耐用隊列添加:

channel.queue_declare(queue='your_queue', durable=True) 

到兩個,出版商和消費者(做發佈/消費前)

然後,即使RabbitMQ服務器死亡並重新啓動,您也可以確保您的隊列不會丟失。

出版商

在發行,加上properties=pika.BasicProperties(delivery_mode=2)basic_publish電話,以確保您的信息是永久性的。

channel.basic_publish(exchange=self.output_exchange_name, 
         routing_key=routing_key, 
         body=message_body.strip(), 
         properties=pika.BasicProperties(delivery_mode=2)) 

這應該做的伎倆,以避免丟失_published消息。

消費者

從消費者角度來看,該official RabbitMQ tutorial for python說:

爲了確保信息不會丟失,RabbitMQ的支持消息確認。一個ack(請求)被從消費者發回,告訴RabbitMQ已經收到,處理了一個特定的消息,並且RabbitMQ可以自由刪除它。 [...]默認情況下,消息確認已打開。

當你建立了消費者,確保您發送ACK 得當,讓RabbitMQ的從隊列中刪除它。

def callback(ch, method, properties, body): 
    print "Received %r" % (body,) 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

channel.basic_consume(callback, queue='your_queue') 

的真正安全的方式

如果你需要一個更強大和更可靠的方法是完全肯定確認發佈中繼上RabbitMQ的,你應該使用AMQP協議的plublish confirm功能。

pika documentation

import pika 

# Open a connection to RabbitMQ on localhost using all default parameters 
connection = pika.BlockingConnection() 

# Open the channel 
channel = connection.channel() 

# Declare the queue 
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False) 

# Turn on delivery confirmations 
channel.confirm_delivery() 

# Send a message 
if channel.basic_publish(exchange='test', 
         routing_key='test', 
         body='Hello World!', 
         properties=pika.BasicProperties(content_type='text/plain', 
                 delivery_mode=1)): 
    print 'Message publish was confirmed' 
else: 
    print 'Message could not be confirmed' 

所以要根據你的代碼,我將使用類似於:

count=0 
for json_string in open(json_file, 'r'): 
    result_json = json.loads(json_string) 
    message_body = json.dumps(result_json['body']) 
    routing_key = result_json['RoutingKey'] 
    if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(), 
          properties=pika.BasicProperties(delivery_mode=2)): # Make it persistent 
     count += 1 
    else: 
     # Do something with your undelivered message 
self.logger.info('Sent %d messages' % count) 
connection.close() 

或者作爲蠻力的方法,你可以使用一個while請使用循環代替if以確保發送所有消息:

count = 0 
for json_string in open(json_file, 'r'): 
    result_json = json.loads(json_string) 
    message_body = json.dumps(result_json['body']) 
    routing_key = result_json['RoutingKey'] 
    while not channel.basic_publish(exchange=self.output_exchange_name, 
            routing_key=routing_key, 
            body=message_body.strip(), 
            properties=pika.BasicProperties(delivery_mode=2)): 
     pass # Do nothing or even you can count retries 
    count += 1 
self.logger.info('Sent %d messages' % count) 
0

嘗試這與您的命令只接收一個消息:

#!/usr/bin/env python 
import pika 
import ujson as json 


def receive(): 
    parameters = pika.ConnectionParameters(host='localhost') 
    connection = pika.BlockingConnection(parameters) 
    channel = connection.channel() 
    channel.queue_declare(queue='raw_post', durable=True) 

    method_frame, header_frame, body = channel.basic_get(queue='raw_post') 

    if method_frame.NAME == 'Basic.GetEmpty': 
     connection.close() 
     return '' 
    else: 
     channel.basic_ack(delivery_tag=method_frame.delivery_tag) 
     connection.close() 
     return json.loads(body), method_frame.message_count 


a = '' 
while a not in ['quit', 'sair', 'exit', 'bye']: 
    a = input("whats up?") 
    print(receive()) 

只是5000級的郵件發送者排隊:

#!/usr/bin/env python 
import pika 
import ujson as json 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

channel.queue_declare(queue='raw_post', durable=True) 

for i in range(5000): 
    info = {"info": "test", "value": i} 

    channel.basic_publish(exchange='', 
          routing_key='raw_post', 
          body=json.dumps(info), 
          properties=pika.BasicProperties(
           delivery_mode=2, # make message persistent 
         )) 

    print(" [x] Sent 'Hello World!' {}".format(i)) 
connection.close()