首先,你用什麼編程語言?最常見的語言,如python,java,c#,都支持爲並行進程創建額外的線程。
比方說,你消耗的隊列像下面(假性代碼):
def callback(ch, method, properties, body) ...
def threaded_function(ch, method, properties, body) ...
channel.basic_qos(prefetch_count=3)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
第一,設置「prefetch_count = 3」讓你的消費者必須在-最不-ACK狀態3個消息兼任。
在回調方法中,您應該啓動一個線程來執行帶有threaded_function的每條消息。在threaded_function方法體的結束,這樣做:
ch.basic_ack(delivery_tag = method.delivery_tag)
,這樣,在最3個消息可以並行處理,即使需要花費更長的時間爲一個或兩個線程來運行,別人仍然可以的處理下一個消息。