1

我正在使用Spring AMQP(RabbitMQ實現),並試圖將單個事務傳播到多個線程中。例如,假設有3個隊列,名稱爲X,Y,Z,首先我使用線程1從隊列X獲取消息,然後將該消息提供給線程0,並且在線程0消息中被克隆並通過線程3發送到隊列Y,線程2和隊列Z通過線程3發送。線程0等待線程3和線程4完成,以提交或回滾消息。請注意,我在這裏使用4個線程。多線程的AMQP春季交易

我想要的基本上是將這3個操作(獲取消息並將其放到兩個隊列中)作爲單個事務處理。即如果我成功地將消息發送到隊列Y,但未能將其發送到Z,則發送到Y的消息將被回滾,並且原始消息也將被回滾到隊列X中。

到目前爲止,我已經設法通過threadLocals(主要是TransactionStatus和TransactionSynchronizationManager.resources)來傳遞事務信息,並且我能夠將這3個操作綁定到一個事務中。

但我的問題是發送ACK/NACK原始隊列X,即使我提交/回滾事務,它只適用於隊列Y和Z只。從X獲得的消息始終處於未確認狀態。

我試圖通過channel.basicAck(),RabbitUtils.commitIfNecessary()方法,但沒有成功。

請注意,我也啓用了channelTransacted。任何幫助,高度讚賞。

+0

在提交時,線程1處於活動狀態?也許死了,這是等什麼? –

+0

在提交時,線程1返回到池中(其所有線程本地都被清除)。提交由線程0完成。我已經更新了這個問題。 –

+0

也許只將txSize設置爲1,僅用於測試或在測試期間發送比txSize更多的消息。 –

回答

1

Spring事務綁定到單個線程。您可能可以直接使用RabbitMQ客戶端來工作 - 但您必須在所有線程中共享相同的頻道。但是,RabbitMQ documentation強烈建議不要這樣做:

通道實例不能在線程之間共享。應用程序應該更喜歡使用每個線程的通道,而不是在多個線程之間共享相同的通道盡管通道上的一些操作可以安全地同時調用,但有些操作不會並且會導致不正確的幀交錯。 ...

在任何情況下,即使您在單線程上工作,RabbitMQ交易也相當薄弱並且不能保證太多;見broker semantics

AMQP僅在事務涉及單個隊列時保證原子性,即所有在tx內發佈的路由都被路由到單個隊列,並且所有ack都與從相同隊列消耗的消息相關。 ...

+0

我想知道事務如何綁定到線程。挖了一點之後,我發現了兩個似乎保留事務細節的線程本地人,那些是TransactionSynchronizationManager.resources和ConsumerChannelRegistry.consumerChannel。我試着將這些線程本地代碼從一個線程移動到另一個線程,以便在線程之間切換事務,但正如我在原始問題中提到的那樣,第一個隊列不會被獲取。我在這裏錯過了什麼? –

+0

我不知道你錯過了什麼,但正如我所指出的,你不建議在多個線程上使用一個通道。 –

+0

非常感謝Gary的澄清。但無論如何,我會繼續挖掘更多,以確定它在技術上是否可行,甚至是強硬的建議。 –