2017-07-16 344 views
0

我有2個隊列,比如說q1和q2,它對應於e1和e2與綁定密鑰b1和b2交換。我想要並行運行消費者功能,比如c1和c2,它們將分別監聽q1和q2。我試了下面的方法:rabbitmq中的多個消費者爲多個隊列

def c1(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e1', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q1') 
    queue_name = result.method.queue 
    binding_key = "b1" 
    channel.queue_bind(exchange='e1', 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

def c2(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e2', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q2') 
    queue_name = result.method.queue 
    binding_key = "b2" 
    channel.queue_bind(exchange=e1, 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

if __name__ == '__main__': 
    c1() 
    c2() 

但是,它只是偵聽c1函數和c2函數,它沒有被執行。我怎樣才能運行這兩個功能? 在此先感謝。

編輯:我有方法c1和在2個不同的模塊(文件)C1

+0

你應該使用python線程模塊或一些替代阻止連接。 – alphiii

回答

1

爲了運行兩種功能同時一些多線程的方法需要在順序。請看一下here的一些python例子。

這是用Process類修改的代碼。它也可以使用線程或從OS明確運行它。

import pika 
from multiprocessing import Process 


def callback(): 
    print 'callback got data' 


class c1(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e1', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q1') 
     queue_name = result.method.queue 
     binding_key = "b1" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 
     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 


class c2(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e2', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q2') 
     queue_name = result.method.queue 
     binding_key = "b2" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 

     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 

if __name__ == '__main__': 
    subscriber_list = [] 
    subscriber_list.append(c1()) 
    subscriber_list.append(c2()) 

    # execute 
    process_list = [] 
    for sub in subscriber_list: 
     process = Process(target=sub.run) 
     process.start() 
     process_list.append(process) 

    # wait for all process to finish 
    for process in process_list: 
     process.join() 
相關問題