2017-12-18 266 views
0

我有以下問題需要解決: 我想實現一個簡單的使用RabbitMQ消息傳遞的延遲重試機制。我有一個基礎設施,可以讓我延遲傳遞信息。我可以有任何想要在運行時利用這種延遲重試機制的感興趣的參與者。如何註冊一個隊列,並且它是RabbitMQ和Spring的獨佔使用者/偵聽器?

參與者只想給我提供2個細節和消息: 1.隊列名稱,他們希望在延遲T秒後傳遞消息。 2.隊列的消費者(比如消息的消費者。)

我試圖做到以下幾點:

private void startSeparateListener(final Object messageConsumer, 
            final Queue queue) { 
     SimpleMessageListenerContainer simpleMessageListenerContainer 
        = myCustomeSimpleMessageListenerFactory.create(); 
     simpleMessageListenerContainer.setRabbitAdmin(rabbitAdmin); 
     simpleMessageListenerContainer.setQueues(queue); 
     simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(messageConsumer)); 
     simpleMessageListenerContainer.start(); 

    } 

請注意,隊列已被創建並與rabbitadmin已註冊並且對象使用者有一個名爲handleMessage的方法來偵聽隊列。

這是在運行時爲隊列的消息使用者動態註冊隊列的正確方法嗎?

注: 春天已經提供了類型SimpleMessageListenerContainer一樣的豆,但會使用bean添加隊列和消費者的動態會導致發言權Q1的無意識消費的問題,被稱爲另一個隊列的接收器的一部分,說Q2,其內容類型可能與Q1相同?

我嘗試了很多關於它的搜索,但無法獲得任何具體的解釋。如果它是一個重複的問題和任何天真的話,事先道歉。

回答

0

我不能編譯你的問題,但我可以告訴你已經有了RabbitMQ和Sring AMQP supports的Dealyed Exchange解決方案。

我建議遠離動態添加SimpleMessageListenerContainer:它不像它看起來那麼簡單。有沒有像addQueueNames()一個選項:

/** 
* Add queue(s) to this container's list of queues. The existing consumers 
* will be cancelled after they have processed any pre-fetched messages and 
* new consumers will be created. The queue must exist to avoid problems when 
* restarting the consumers. 
* @param queueName The queue to add. 
*/ 
@Override 
public void addQueueNames(String... queueName) { 

所以,你可能會考慮不增加新的集裝箱,但添加新的隊列,以現有的一個。 amqp_consumerQueue的下游路由可能有助於區分來自不同隊列的消息。

+0

尊敬的@ artem-bilan,感謝您的回覆。我的道歉是我不能很好地解決問題。我知道延遲交換插件,但是我已經創建了自己的延遲基礎結構,使用交換和隊列以及適當定義的x-message-ttl和x-deadletter-exchange。這整個infra發送所有的消息到匯兌。我想創建一個隊列,將它綁定到交換機上,並在隊列中註冊一個消費者,全部在運行時。因此,我創建一個新的SimpleMessageListenerContainer並使用它的問題。我希望我能更好地解釋它。 –

相關問題