2013-10-31 102 views
4

我有activemq在我的系統中使用,我看到的是以下消息: TopicSubscription:consumer = ...:等待消息遊標[org.apache.activemq.broker.region .cursors.VMPendingMessageCursor @ 1684f89c]已滿,達到臨時使用(0%)或內存使用率(100%)限制,阻止消息添加(),等待釋放資源。activemq緩慢消費者塊生產者,雖然producerFlowControl是false

這是因爲如果我理解正確,我的消費者速度很慢,而我的製片人速度很快。其結果是,最終我的製作人被阻止,直到消費者閱讀消息並釋放一些內存。我想知道的是,我的製作人沒有被封鎖,而且當內存已滿時,舊信息正在被分散。

鑑於我的理解我已閱讀​​以下配置應該做的伎倆(messageEvictionStrategy,pendingMessageLimitStrategy),但它不適合我,我不明白爲什麼。

由於測試的原因,我已經指定低存儲空間限制低(35Mb),以使問題更快,但情況是我最終需要它,當問題發生時,activemq才放棄舊消息。

我發現在ActiveMQConnectionFactory useAsyncSend = true並指定sendTimeout中設置了一個令人不滿意的解決方案。這使得生產者不被阻止,但是通過這種方式,最新的消息被丟棄,而不是舊的消息。

最後,我正在談論非持久性話題。

任何幫助傢伙會完美。下面我ActiveMQ的配置

 <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
       <policyEntry topic=">" producerFlowControl="false" memoryLimit="35 Mb"> 
       <pendingSubscriberPolicy> 
        <vmCursor /> 
       </pendingSubscriberPolicy> 
        <messageEvictionStrategy> 
         <oldestMessageEvictionStrategy/> 
        </messageEvictionStrategy> 
        <pendingMessageLimitStrategy> 
         <constantPendingMessageLimitStrategy limit="10"/> 
        </pendingMessageLimitStrategy> 
       </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 

    <systemUsage> 
     <systemUsage sendFailIfNoSpace="true"> 
      <memoryUsage> 
       <memoryUsage limit="35 mb"/> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="1 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="5000 mb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 

的ActiveMQ版本5.7.0

我使用Spring模板來發送郵件:

<bean class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/> 
    <property name="timeToLive" value="100"/> 
</bean> 

我發送javax.jms.ObjectMessage,體積小relativelly。

我在客戶前提中發現問題我在應用程序中有許多應用程序,但設法重現它從1個線程不斷地發送,不停地發送消息總是到同一主題。發送的消息只是一個小字符串。

我只有一個生產者,當我有1個(或更多)慢消費者時,問題似乎就會出現 - 但是一個慢消費者就夠了。如果不存在緩慢的消費者,則不會出現問題。

我不認爲這有什麼差別,但我使用

 <transportConnectors> 
     <transportConnector name="openwire" uri="nio://0.0.0.0:33029?wireFormat.maxInactivityDuration=60000&amp;wireFormat.maxInactivityDurationInitalDelay=60000"/> 
    </transportConnectors> 
+0

難題或我不描述的東西?請告訴我,如果有什麼東西是未知的或需要進一步澄清 – Alexandros

回答

1

事實證明,當使用JmsTemplate爲了使異步發送,然後消息無法傳遞,我們需要啓用explicitQosEnabled並設置deliveryMode = 1(非持久性)。 同樣在客戶端,消費者需要有一個較小的預取限制

服務器

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/> 
    <property name="explicitQosEnabled" value="true"/> 
    <property name="deliveryMode" value="1"/> 
</bean> 

客戶

<jms:listener-container .. prefetch="1000"> 
... 
</jms:listener-container> 

不要問我爲什麼...但是這似乎已經解決了我的問題。 基本上不是100%需要,但如果有人能向我解釋這將是完美

2

如何我可以重新呢?有多少生產者/消費者參與這個主題?這只是一個主題嗎?

您的設置看起來不錯,但您不需要在策略上設置memoryLimit = 35mb。將它設置爲與整個系統使用情況相同是沒有意義的。這個想法是所有主題組合的內存限制等於系統內存限制。例如,如果您有兩個主題,則每個主題都使用35MB(2 * 35 == 70MB),這會超過整個系統內存設置。我不認爲這是你所看到的具體原因,但要記住。

也是什麼版本的ActiveMQ是這樣的?如果你已經寫了一些可以產生這種結果的測試,請告訴我。

+0

在上面的主要問題中添加回答你的問題 – Alexandros

+0

memoryLimit是的,我讀了它並不需要,因爲你上面描述,我正在閱讀activemq配置,並試圖使其工作所以我加了它,當你提到它存在或不存在時它沒有區別。 – Alexandros

+0

我可以通過一種方式進一步調查,我可以下載源代碼並檢查某些內容,但不知道在哪裏放置斷點。我真的堅持這個問題,需要幫助。 也是一個額外的信息,可能是有用的,我使用timeToLive = 100的JMSTemplate。 – Alexandros