2013-02-25 233 views
2

我想了解在RabbitMQ(使用Spring AMQP或Java客戶端直接)中合併或傳入消息的最佳方式。RabbitMQ將消息分組爲一個消息,即合併消息

換句話說,我想說100個傳入消息並將它們合併爲1,然後以可靠的方式將其重新發送到另一個隊列(正確的方式爲ACK)。我相信這在EIP中被稱爲aggregator模式。

我知道Spring集成provides an aggregator solution,但實現看起來像它的不安全(它看起來像它必須確認並使用消息來構建合併的消息,因此,如果您在執行此操作時關閉它,則會丟失消息?)。

回答

2

如果將<amqp-inbound-channel-adapter/>tx-size屬性設置爲100,那麼容器將每隔100條消息確認一次,以防止郵件丟失。

但是,您可能想要發送聚合消息(在第100個接收處)事務處理,以便您可以確認代理在入站消息確認之前是否有消息。

+0

我認爲prefetchCount('org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#setPrefetchCount')(其與TX大小有關)是我需要一起玩。 – 2013-02-25 17:00:22

+0

不;請參閱SimpleMessageListenerContainer。doReceiveAndExecute()'和'BlockingQueueConsumer.commitIfNecessary';如果容器不是事務性的,那麼僅僅'最後'遞送標籤被收到(在txSize消息的循環內)。預取調整後,如果它小於txSize ...'int actualPrefetchCount = prefetchCount> txSize? prefetchCount:txSize;'。 – 2013-02-25 17:18:07

+0

是的,我想我明白了,我想我需要編寫自己的SimpleMessageListenerContainer來將預取的消息轉移到賬戶中。現在消息容器被設計爲單個消息(即沒有'onMessage(Collection ...)')。基本上它看起來像我需要寫我自己的'org.springframework.amqp.rabbit.listener.BlockingQueueConsumer' – 2013-02-25 17:49:17

2

我不能直接對Spring集成庫進行評論,所以我會通常用RabbitMQ來講話。

如果你不是百分之百地相信Aggregator的Spring Integration實現,並且試圖自己實現它,那麼我建議避免使用tx,它使用RabbitMQ中的事務處理下的事務。

RabbitMQ的交易速度很慢,如果您要構建高流量/吞吐量系統,您肯定會遇到性能問題。

相反,我建議你看看Publisher確認這是在RabbitMQ中實現的AMQP的擴展。下面是它的新介紹http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

您需要調整預取設置才能獲得正確的性能,請參閱http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/瞭解一些詳細信息。

以上所有內容都爲您提供了一些幫助解決問題的背景知識。實現相當簡單。

在創建您的客戶時,您需要確保設置它以確保需要ACK。

  1. 出列n條消息,因爲你出列,則需要記下DeliveryTag的每個消息(這是用來ACK消息)
  2. 骨料消息到一個新的消息
  3. 發佈新消息
  4. ACK各取出的消息

有一點需要注意的是,如果你的消費3後死亡和前4已經當它回來然後完成了未ACK'd將重新處理這些消息生活

+0

+1春季AMQP已經確認了待確認。我需要TX的原因是我需要能夠支持事務資源同步的提交/回滾(即在db事務提交或回滾之後提交消息)。我要/想要在內存事務中創建自己的事務(即在數據庫提交上推送消息列表),以便我可以得到你所談論的性能...但這是我必須解決的另一個問題:) – 2013-02-26 14:38:28