我想了解在RabbitMQ(使用Spring AMQP或Java客戶端直接)中合併或傳入消息的最佳方式。RabbitMQ將消息分組爲一個消息,即合併消息
換句話說,我想說100個傳入消息並將它們合併爲1,然後以可靠的方式將其重新發送到另一個隊列(正確的方式爲ACK
)。我相信這在EIP中被稱爲aggregator模式。
我知道Spring集成provides an aggregator solution,但實現看起來像它的不安全(它看起來像它必須確認並使用消息來構建合併的消息,因此,如果您在執行此操作時關閉它,則會丟失消息?)。
我認爲prefetchCount('org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#setPrefetchCount')(其與TX大小有關)是我需要一起玩。 – 2013-02-25 17:00:22
不;請參閱SimpleMessageListenerContainer。doReceiveAndExecute()'和'BlockingQueueConsumer.commitIfNecessary';如果容器不是事務性的,那麼僅僅'最後'遞送標籤被收到(在txSize消息的循環內)。預取調整後,如果它小於txSize ...'int actualPrefetchCount = prefetchCount> txSize? prefetchCount:txSize;'。 – 2013-02-25 17:18:07
是的,我想我明白了,我想我需要編寫自己的SimpleMessageListenerContainer來將預取的消息轉移到賬戶中。現在消息容器被設計爲單個消息(即沒有'onMessage(Collection ...)')。基本上它看起來像我需要寫我自己的'org.springframework.amqp.rabbit.listener.BlockingQueueConsumer' –
2013-02-25 17:49:17