2016-09-23 129 views
0

我已經定義了一個JMS入站通道適配器作爲以下中Spring集成3.0.1.RELEASE:Spring集成JMS入站通道適配器在特定的固定速率不輪詢

<int-jms:inbound-channel-adapter channel="inChannel" phase="1000" 
           destination-name="jmsQueue" extract-payload="true" 
           connection-factory="connectionFactory"> 
    <int:poller max-messages-per-poll="1" fixed-rate="1000"/> 
</int-jms:inbound-channel-adapter> 

但是多個消息從消耗JMS代理的隨機不可靠距離,消耗的消息數量可能會從幾秒到幾分鐘。我試過fixed-delay而不是fixed-rate,但它具有相同的行爲。

哪一個其他因素可以使輪詢操作在不同的時間執行,以及如何實現可靠的輪詢時間?

編輯:

我只限於應用到一個默認的輪詢,用一個單一的JMS入站通道適配器(雖然有一些消息驅動通道適配器),它仍然具有相同的行爲。我抽動了等待時間爲fixed-delay的3000和receive-timeout爲5000.

我已經啓動了應用程序,並在JMS隊列中收集了一些消息,日誌顯示這樣的條目,切換線程如下所示後一些回調操作的任務調度的:

2016-09-23 18:48:25,592 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:18:1,started=true} 
2016-09-23 18:48:25,630 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:28,639 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:19:1,started=true} 
2016-09-23 18:48:28,643 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:31,651 | DEBUG | ask-scheduler-3 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:20:1,started=true} 
2016-09-23 18:48:31,657 | DEBUG | ask-scheduler-3 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:34,666 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:21:1,started=true} 
2016-09-23 18:48:34,670 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 

然後,在10分鐘後:

2016-09-23 18:58:10,032 | DEBUG | ask-scheduler-8 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:212:1,started=true} 
2016-09-23 18:58:10,091 | DEBUG | ask-scheduler-8 | ion.endpoint.SourcePollingChannelAdapter | Poll resulted in Message: 

並且該消息被消耗。

我已經拿了多個轉儲,並可能在運行狀態下找到任務執行線程只有一個實例:

"task-scheduler-4" prio=6 tid=0x000000001074f800 nid=0x4364 runnable [0x000000001d4fe000] 
    java.lang.Thread.State: RUNNABLE 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 
    at java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
    at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:123) 
    at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:167) 
    at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237) 
    - locked <0x00000007dc803080> (a java.util.concurrent.atomic.AtomicBoolean) 
    at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83) 
    at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104) 
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) 
    - locked <0x00000007dc8031f8> (a java.lang.Object) 
    at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) 
    at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225) 
    at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219) 
    at org.apache.activemq.ActiveMQSession.doClose(ActiveMQSession.java:590) 
    at org.apache.activemq.ActiveMQSession.close(ActiveMQSession.java:581) 
    at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108) 
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497) 
    at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:761) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:118) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:93) 
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:111) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:184) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

所有其他線程轉儲表明,任務調度的所有線程要麼是上WAITING或TIMED_WAITING如下(包括完成之後上一次轉儲的線程)。這從最後一個30秒後轉儲:

"task-scheduler-4" prio=6 tid=0x00000000118d3800 nid=0x4abc waiting on condition [0x000000000f8bf000] 
    java.lang.Thread.State: WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x00000007838d74d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

    Locked ownable synchronizers: 
    - None 

"task-scheduler-3" prio=6 tid=0x00000000118d4800 nid=0x4f14 waiting on condition [0x000000001ba0f000] 
    java.lang.Thread.State: TIMED_WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x0000000787c10210> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

    Locked ownable synchronizers: 
    - None 

任何線索?

回答

0

如果您的應用程序中有很多輪詢器,您可能會遭受線程不足的困擾; default task scheduler只有10個線程;你可以增加計數。

默認情況下輪詢從QueueChannel s塊的輪詢1秒。

通常,打開org.springframework.integration的DEBUG日誌記錄將有助於解決這類問題。

採取線程轉儲並查看任務計劃程序線程活動也應該有所幫助。

+0

我編輯了一些日誌和線程轉儲文章,仍然無法整理你:/ – gnzlrm

+0

很難說,顯然輪詢者正在與ActiveMQ交互 - 關閉會話。我不知道activemq內部結構,爲什麼這可能需要很長時間。我可以想象如果你有巨大的消息和/或一個糟糕的網絡零星的時間。下一步,我會採取如果調試這將是看着WireShark或類似的網絡跟蹤。 –

+0

對不起,我上週休假了。我無法使用WireShark,因爲我在一個受限制的環境中使用有限的工具箱:/ 有一件事我看起來就是這樣,當我指定'receive-timeout = 5000'時,輪詢操作顯然很少(一些民意調查幾乎立即被報告爲「虛假」),而報告接收消息的操作需要更多時間。是不是應該在回調操作開始和結果報告爲'假'之間等待5秒鐘? – gnzlrm

相關問題