簡單(不可靠)方式
首先,啓用耐用隊列添加:
channel.queue_declare(queue='your_queue', durable=True)
到兩個,出版商和消費者(做發佈/消費前)。
然後,即使RabbitMQ服務器死亡並重新啓動,您也可以確保您的隊列不會丟失。
出版商
在發行,加上properties=pika.BasicProperties(delivery_mode=2)
您basic_publish
電話,以確保您的信息是永久性的。
channel.basic_publish(exchange=self.output_exchange_name,
routing_key=routing_key,
body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2))
這應該做的伎倆,以避免丟失_published消息。
消費者
從消費者角度來看,該official RabbitMQ tutorial for python說:
爲了確保信息不會丟失,RabbitMQ的支持消息確認。一個ack(請求)被從消費者發回,告訴RabbitMQ已經收到,處理了一個特定的消息,並且RabbitMQ可以自由刪除它。 [...]默認情況下,消息確認已打開。
當你建立了消費者,確保您發送ACK 得當,讓RabbitMQ的從隊列中刪除它。
def callback(ch, method, properties, body):
print "Received %r" % (body,)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue='your_queue')
的真正安全的方式
如果你需要一個更強大和更可靠的方法是完全肯定確認發佈中繼上RabbitMQ的,你應該使用AMQP協議的plublish confirm功能。
從pika documentation:
import pika
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection()
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)
# Turn on delivery confirmations
channel.confirm_delivery()
# Send a message
if channel.basic_publish(exchange='test',
routing_key='test',
body='Hello World!',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=1)):
print 'Message publish was confirmed'
else:
print 'Message could not be confirmed'
所以要根據你的代碼,我將使用類似於:
count=0
for json_string in open(json_file, 'r'):
result_json = json.loads(json_string)
message_body = json.dumps(result_json['body'])
routing_key = result_json['RoutingKey']
if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2)): # Make it persistent
count += 1
else:
# Do something with your undelivered message
self.logger.info('Sent %d messages' % count)
connection.close()
或者作爲蠻力的方法,你可以使用一個while
請使用循環代替if
以確保發送所有消息:
count = 0
for json_string in open(json_file, 'r'):
result_json = json.loads(json_string)
message_body = json.dumps(result_json['body'])
routing_key = result_json['RoutingKey']
while not channel.basic_publish(exchange=self.output_exchange_name,
routing_key=routing_key,
body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2)):
pass # Do nothing or even you can count retries
count += 1
self.logger.info('Sent %d messages' % count)