2014-12-11 179 views
1

問題駱駝dynamicRouter導致無限循環

  • 「無限」 循環使用dynamicRouter()
  • 使用producerTemplate.send(端點,交換)的時候看起來很不錯時遇到的。但是,由於CountDownLatch,它會在org.apache.camel.processor.UnitOfWorkProducer.process(Exchange)方法中導致阻塞。

我使用不正確的producerTemplate.send方法嗎?我怎樣才能解決dynamicRouter()的「無限」循環問題?下面的配置有什麼問題嗎?

樣品路線定義

private static final String inboundUri = "jms:request"; 
private static final String replyUri = "jms:reply"; 
private static final String subReply = "jms:subReply"; 
private static final String DEFAULT_ENDPOINT = "DEFAULT_ENDPOINT"; 


// Route Configuration 

from(inboundUri).routeId("generic-jms-inbound").setExchangePattern(ExchangePattern.InOnly).threads().bean(dummyProcessor) 
.setHeader(DEFAULT_ENDPOINT, constant("direct:main")) 
.process(exchange -> { 
    ProducerTemplate producerTemplate = exchange.getContext().createProducerTemplate(); 
    producerTemplate.send("direct:main", exchange); 
}) 
//--------------------------- 
//.dynamicRouter().header("DEFAULT_ENDPOINT") <== This causes "infinite" loop 
//--------------------------- 
; 

from("direct:main").setExchangePattern(ExchangePattern.InOnly).threads().bean(dummyProcessor).wireTap("direct:wireTap").to(replyUri); 

from("direct:wireTap").setExchangePattern(ExchangePattern.InOnly).threads().bean(MessageHistoryLogger.class).to(subReply); 

在現實情況下,異常日誌看起來是這樣的

Message History 
--------------------------------------------------------------------------------------------------------------------------------------- 
RouteId    ProcessorId   Processor                  Elapsed (ms) 
[generic-jms-inboun] [generic-jms-inboun] [jms://terminalRequest               ] [  94714] 
[generic-jms-inboun] [setExchangePattern] [setExchangePattern[InOnly]             ] [   0] 
[generic-jms-inboun] [threads2   ] [threads                  ] [  94714] 
[generic-jms-inboun] [bean9    ] [bean[[email protected]c0f53b] ] [   0] 
[generic-jms-inboun] [dynamicRouter1 ] [dynamicRouter[header{header(defaultEndPointUri)}]        ] [  94714] 
[sellTicket  ] [setExchangePattern] [setExchangePattern[InOnly]             ] [   0] 
[sellTicket  ] [threads4   ] [threads                  ] [   5] 
[sellTicket  ] [bean14   ] [bean[[email protected]548f] ] [   0] 
[sellTicket  ] [bean17   ] [bean[com.[email protected]7d061] [   0] 
[sellTicket  ] [wireTap1   ] [wireTap[direct:ceRequest]              ] [   5] 
[sellTicket  ] [bean18   ] [bean[com.xxxx[email protected]5] [   0] 
[sellTicket  ] [bean19   ] [bean[com.xxx[email protected]62] [   0] 
[sellTicket  ] [to2    ] [jms:terminalReply                ] [   0] 
[sellTicket  ] [setExchangePattern] [setExchangePattern[InOnly]             ] [   0] 
[sellTicket  ] [threads4   ] [threads                  ] [   1] 
[sellTicket  ] [bean14   ] [bean[[email protected]548f] ] [   0] 
[sellTicket  ] [bean17   ] [bean[com.xxxxxxxxxxxx.integration.camel.p[email protected]] [   0] 
[sellTicket  ] [wireTap1   ] [wireTap[direct:ceRequest]              ] [   0] 
[sellTicket  ] [bean18   ] [bean[com.xxxx[email protected]5] [   0] 
[sellTicket  ] [bean19   ] [bean[com.xxx[email protected]62] [   0] 
[sellTicket  ] [to2    ] [jms:terminalReply                ] [   1] 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> !!!!!!!!!!!!!!! KEEPS Repeating FROM (bean14 ==> to2)! !!!!!!!!!!!!! <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 
[ceRequest   ] [setExchangePattern] [setExchangePattern[InOnly]             ] [   0] 
[ceRequest   ] [threads5   ] [threads                  ] [  74291] 
[ceRequest   ] [bean20   ] [bean[com.x[email protected]4303] [   0] 
[ceRequest   ] [marshal1   ] [marshal[[email protected]]   ] [   0] 
[ceRequest   ] [to3    ] [jms:ceRequest                 ] [  74161] 

Stacktrace 
--------------------------------------------------------------------------------------------------------------------------------------- 
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Error sending message - transport error (streaming response timeout) 
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) 
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) 
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496) 
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228) 
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:431) 
    at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:385) 
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153) 
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120) 
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) 
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398) 
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118) 
    at org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43) 
    at org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136) 
    at org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: javax.jms.JMSException: Error sending message - transport error (streaming response timeout) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:408) 
    at com.solacesystems.jms.impl.JMSExceptionValue.newInstance(JMSExceptionValue.java:32) 
    at com.solacesystems.jms.impl.JCSMPExceptionMapper$ArrayListMapper.get(JCSMPExceptionMapper.java:31) 
    at com.solacesystems.jms.impl.JCSMPExceptionMapper.get(JCSMPExceptionMapper.java:90) 
    at com.solacesystems.jms.impl.Validator.createJMSException(Validator.java:541) 
    at com.solacesystems.jms.SolMessageProducer.sendMessage(SolMessageProducer.java:343) 
    at com.solacesystems.jms.SolMessageProducer.send(SolMessageProducer.java:155) 
    at org.springframework.jms.connection.CachedMessageProducer.send(CachedMessageProducer.java:179) 
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:635) 
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:336) 
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:275) 
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217) 
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231) 
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:493) 
    ... 19 common frames omitted 
Caused by: com.solacesystems.jcsmp.JCSMPTransportException: streaming response timeout 
    at com.solacesystems.jms.impl.FlowMessageProducerAdapter.send(FlowMessageProducerAdapter.java:105) 
    at com.solacesystems.jms.SolMessageProducer.sendMessage(SolMessageProducer.java:336) 
    ... 27 common frames omitted 

回答