2012-02-29 92 views
2

我使用ActiveMq作爲JMS服務器和Atomikos作爲事務管理器。使用ActiveMq和Atomikos兩次出列的JMS消息

關於ActiveMq管理員web界面我看到一條消息已入隊,但2(!)消息已出隊。

但是,jms消費者只處理消息一次,沒有重複處理。當我使用簡單的Spring JmsTransactionManager時,有一個入隊消息和一個出隊消息。該問題僅出現在Atomikos JTA交易管理器中。

什麼問題?如何配置Atomikos不會看到兩次出隊郵件?

我的配置如下。它和教程中的幾乎一樣。順便說一下,Atomikos的Spring集成示例運行良好,但它使用Spring 2.0,而我使用Spring 3.1。

<bean id="activeMqXaConnectionFactory" class="org.apache.activemq.spring.ActiveMQXAConnectionFactory"> 
    <property name="brokerURL" value="tcp://localhost:61616"/> 
</bean> 

<bean id="atomikosQueueConnectionFactory" class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init"> 
    <property name="resourceName" value="xaMq"/> 
    <property name="xaQueueConnectionFactory" ref="activeMqXaConnectionFactory"/> 
</bean> 

<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 
    <property name="targetConnectionFactory" ref="atomikosQueueConnectionFactory"/> 
</bean> 

<bean id="jmsDefaultContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
    <property name="connectionFactory" ref="singleConnectionFactory"/> 
    <property name="messageListener" ref="consumer"/> 
    <property name="concurrentConsumers" value="1"/> 
    <property name="destinationName" value="q.jtaxa"/> 
    <property name="receiveTimeout" value="3000"/> 
    <property name="recoveryInterval" value="5000"/> 
    <property name="transactionManager" ref="transactionManager"/> 
    <property name="sessionTransacted" value="true"/> 
</bean> 

<!-- Transactions --> 
<!--Construct Atomikos UserTransactionManager, needed to configure Spring--> 
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" 
     destroy-method="close"> 
    <!-- when close is called, should we force transactions to terminate or not? --> 
    <property name="forceShutdown" value="true"/> 
</bean> 

<!-- Also use Atomikos UserTransactionImp, needed to configure Spring --> 
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> 
    <property name="transactionTimeout" value="300"/> 
</bean> 

<!-- Configure the Spring framework to use JTA transactions from Atomikos --> 
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> 
    <property name="transactionManager" ref="atomikosTransactionManager"/> 
    <property name="userTransaction" ref="atomikosUserTransaction"/> 
    <property name="nestedTransactionAllowed" value="true"/> 
</bean> 

消費類:

@Component 
public class Consumer implements MessageListener { 
    @Override 
    public void onMessage(Message message) { 
    if (message instanceof TextMessage) { 
     try { 
      TextMessage textMsg = (TextMessage) message; 
      System.out.println("   " + textMsg.getText()); 
     } catch (JMSException ex) { 
      ex.printStackTrace(); 
     } 
    } 
    } 
} 

回答

1

我發現了什麼是錯誤配置。它是「atomikosQueueConnectionFactory」。我確實習慣了教程,但應該只是AtomikosConnectionFactoryBean類而不是QueueConnectionFactoryBean。 我刪除atomikosQueueConnectionFactory並添加atomikosConnectionFactory

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init"> 
    <property name="uniqueResourceName" value="amq1"/> 
    <property name="xaConnectionFactory" ref="mqXaConnectionFactory"/> 
</bean> 

它之後,它工作正常。 我找到了正確的配置here