2017-07-03 147 views
1

我嘗試使用Azure服務總線和Apache Qpid和Spring與事務集成。與AMQP的Azure服務總線事務

但似乎Azure服務總線AMQP實現不支持事務。這是真的嗎?我沒有找到相關信息。

這是我的JMS配置

<bean id="serviceBusConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory"> 
    <constructor-arg value="amqps://${serviceBus.host}?amqp.idleTimeout=1200000"/> 
    <property name="clientID" value="${serviceBus.clientId}"/> 
    <property name="username" value="${serviceBus.sharedAccessPolicyName}"/> 
    <property name="password" value="${serviceBus.sharedAccessPolicyKey}"/> 
    <property name="receiveLocalOnly" value="true"/> 
    <property name="receiveNoWaitLocalOnly" value="true"/> 
</bean> 
<bean id="jmsCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="serviceBusConnectionFactory" /> 
</bean> 

這是我的春天整合片段:

<int-jms:inbound-channel-adapter id="resOrdJmsIn" 
           destination-name="${serviceBus.destination-name}" 
           channel="resOrdIncoming" 
           connection-factory="jmsCachingConnectionFactory" 
           acknowledge="client" 
           session-transacted="true" > 
    <int:poller fixed-rate="1000"/> 
</int-jms:inbound-channel-adapter> 

它與會話的事務=「假」 但會議事務處理=「真「它產生錯誤:

2017-07-03 10:06:27.237 ERROR 21575 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error] 
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) 
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) 
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487) 
    at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:754) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:138) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:111) 
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error] 
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:148) 
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:103) 
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167) 
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113) 
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795) 
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:92) 
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699) 
    ... 7 more 

APACHE QPID Trace

2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_FINAL 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_INIT 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_LOCAL_OPEN 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_REMOTE_OPEN 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES : SENT: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Coordinator{capabilities=[amqp:local-transactions]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.q.j.t.netty.NettyTcpTransport  : Attempted write of: 127 bytes 
2017-07-03 10:37:29.328 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport  : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165) 
2017-07-03 10:37:29.329 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl  : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES : RECV: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=null, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=18446744073709551615, offeredCapabilities=null, desiredCapabilities=null, properties=null} 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_INIT 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_LOCAL_OPEN 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_REMOTE_OPEN 
2017-07-03 10:37:29.432 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport  : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165) 
2017-07-03 10:37:29.433 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl  : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input 
+0

Azure的服務總線不支持交易:https://docs.microsoft.com/ EN-US /天藍色/服務總線的消息/服務總線交易 –

回答

0

Spring集成片段,即通過JMS與Azure的服務總線的工作原理,通過AMPQ在事務的方式支持:

<int-jms:message-driven-channel-adapter id="jmsIn" 
              destination-name="${serviceBus.destination-name}" 
              connection-factory="jmsCachingConnectionFactory" 
              acknowledge="client" 
              channel="channel-si"/>