2012-01-15 89 views
5

我有一個系統實現了一些服務器之間通信的Camel和ActiveMQ。我想知道是否有辦法在X時間段後自動過期並清除發送到隊列的消息。由於始發服務器(填充隊列)不知道是否有人正在收集消息,我不希望我的隊列增長到其大到某個東西崩潰爲止。 Bonus karma指向可以幫助並提供java dsl方式來實現此功能的人員。Camel中消息的自動到期

解決方案

// expire message after 2 minutes 
long ttl = System.currentTimeMillis() + 120000; 
// send our info one-way to the group topic 
camelTemplate.sendBodyAndHeader("jms:queue:stats", ExchangePattern.InOnly, stats, "JMSExpiration", ttl); 

回答

1

嘛 setJMSExpiration(長過期):

是什麼東西一定不要當你是客戶時打電話。在ActiveMQ論壇上查看我的演講。

http://apache-qpid-developers.2158895.n2.nabble.com/MRG-Java-JMS-Expiration-td7171854.html

+0

在論壇上的留言中,看起來好像你想要10分鐘過期,而是設置了10秒。恩。 (10 *(60 * 1000))vs(10 * 1000) – 2012-01-15 19:45:25

+0

@Mondain nope,我想要10秒鐘。 10分鐘是從隊列中清除過期消息時的Qpid默認設置。例如:你發送了一條消息到隊列過期設置爲10秒,所以在10秒後消息被QPid過期,並且沒有消費者可以使用它,但事實上這個消息是刪除bu QPid,當刪除策略線程s)踢了,默認設置是他們踢了10分鐘後 – Eugene 2012-01-16 07:01:16

+0

好吧,我誤解了你在那裏發佈的內容。 – 2012-01-16 15:44:07

3

還介意客戶端之間的時鐘 - 經紀人需要保持同步,爲到期的正常工作。如果時鐘不同步,則在代理上收到消息時,客戶端設置的失效可能已經過期。或者客戶時間超過經紀人,所以到期時間超過10秒。

它有點擊敗我爲什麼到期是客戶端時間爲基礎。所以AMQ提供了一個插件,通過重新調整時間來解決這個問題,只能以經紀人爲基礎。請參閱http://activemq.apache.org/timestampplugin.html

1

就我們而言,我們選擇使用部署在ActiveMQ服務本身中的駱駝路線向特定目的地添加到期時間。

唯一要做的就是創建一個如下名稱的XML文件,例如: setJMSExpiration.xml

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd 
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <camelContext id="camel-set-expiration" xmlns="http://camel.apache.org/schema/spring"> 
    <!-- Copy route for each destination to expire --> 
    <route id="setJMSExpiration.my.queue.dlq"> 
     <from uri="broker:queue:MY.QUEUE.DLQ"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 1 day --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 86400000}</spel> 
     </setHeader> 
     <to uri="broker:queue:MY.QUEUE.DLQ"/> 
    </route> 
    <route id="setJMSExpiration.another.queue"> 
     <from uri="broker:queue:ANOTHER.QUEUE"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 5 days --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 432000000}</spel> 
     </setHeader> 
     <to uri="broker:queue:ANOTHER.QUEUE"/> 
    </route> 
    </camelContext> 
</beans> 

,並導入它在你的activemq.xml配置:

<!-- Add default Expiration (file in the same directory) --> 
<import resource="setJMSExpiration.xml"/> 

或者您也可以提供特定per destination policies,如果你不想讓過期消息到達ActiveMQ.DLQ隊列。

<policyEntry queue="MY.QUEUE.DLQ"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 
<policyEntry queue="ANOTHER.QUEUE"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 

這種方式的唯一的限制就是你不能輕易使用通配符,因爲它是在這裏編碼(你可以,但它會通過在駱駝航線使用JMS目的地標題需要一些調整)。

我們嘗試讓生產者定義timeToLive(並儘可能強制它們),但並不總是可以強制他們更改其代碼,這樣可以最大限度地減少此類路由的數量。