2012-03-26 53 views
6

在下面的測試我試圖模擬以下情形:如何在AUTO_ACKNOWLEDGE JMS會話場景中模擬消息重新傳遞?

  1. 消息隊列開始。
  2. 設計爲在消息處理過程中失敗的消費者已啓動。
  3. 生成消息。
  4. 消費者開始處理消息。
  5. 處理期間拋出異常來模擬消息處理失敗。失敗的消費者停止。
  6. 另一位消費者的意圖是選擇重新發送的消息。

但是我的測試失敗,消息沒有重新發送給新的消費者。我會很感激這方面的任何提示。

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig", 
     loader=JavaConfigContextLoader.class) 
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests { 
    @Autowired 
    private FailureReprocessTestScenario testScenario; 

    @Before 
    public void setUp() { 
     testScenario.start(); 
    } 

    @After 
    public void tearDown() throws Exception { 
     testScenario.stop(); 
    } 

    @Test public void 
    should_reprocess_task_after_processing_failure() { 
     try { 
      Thread.sleep(20*1000); 

      assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{ 
        "task-1", 
      }))); 
     } catch (InterruptedException e) { 
      fail(); 
     } 
    } 

    @Configurable 
    public static class FailureReprocessTestScenario { 
     @Autowired 
     public BrokerService broker; 

     @Autowired 
     public MockTaskProducer mockTaskProducer; 

     @Autowired 
     public FailingWorker failingWorker; 

     @Autowired 
     public SucceedingWorker succeedingWorker; 

     @Autowired 
     public TaskScheduler scheduler; 

     public void start() { 
      Date now = new Date(); 
      scheduler.schedule(new Runnable() { 
       public void run() { failingWorker.start(); } 
      }, now); 

      Date after1Seconds = new Date(now.getTime() + 1*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { mockTaskProducer.produceTask(); } 
      }, after1Seconds); 

      Date after2Seconds = new Date(now.getTime() + 2*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { 
        failingWorker.stop(); 
        succeedingWorker.start(); 
       } 
      }, after2Seconds); 
     } 

     public void stop() throws Exception { 
      succeedingWorker.stop(); 
      broker.stop(); 
     } 
    } 

    @Configuration 
    @ImportResource(value={"classpath:applicationContext-jms.xml", 
      "classpath:applicationContext-task.xml"}) 
    public static class ContextConfig { 
     @Autowired 
     private ConnectionFactory jmsFactory; 

     @Bean 
     public FailureReprocessTestScenario testScenario() { 
      return new FailureReprocessTestScenario(); 
     } 

     @Bean 
     public MockTaskProducer mockTaskProducer() { 
      return new MockTaskProducer(); 
     } 

     @Bean 
     public FailingWorker failingWorker() { 
      TaskListener listener = new TaskListener(); 
      FailingWorker worker = new FailingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     @Bean 
     public SucceedingWorker succeedingWorker() { 
      TaskListener listener = new TaskListener(); 
      SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     private DefaultMessageListenerContainer listenerContainer(TaskListener listener) { 
      DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer(); 
      listenerContainer.setConnectionFactory(jmsFactory); 
      listenerContainer.setDestinationName("tasksQueue"); 
      listenerContainer.setMessageListener(listener); 
      listenerContainer.setAutoStartup(false); 
      listenerContainer.initialize(); 
      return listenerContainer; 
     } 

    } 

    public static class FailingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(FailingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public FailingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
     } 

     public void start() { 
      LOG.info("FailingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("FailingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("FailingWorker.processTask(" + task + ")"); 
      try { 
       Thread.sleep(1*1000); 
       throw Throwables.propagate(new Exception("Simulate task processing failure")); 
      } catch (InterruptedException e) { 
       LOG.log(Level.SEVERE, "Unexpected interruption exception"); 
      } 
     } 
    } 

    public static class SucceedingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public final List<String> processedTasks; 

     public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
      this.processedTasks = new ArrayList<String>(); 
     } 

     public void start() { 
      LOG.info("SucceedingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("SucceedingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("SucceedingWorker.processTask(" + task + ")"); 
      try { 
       TextMessage taskText = (TextMessage) task; 
       processedTasks.add(taskText.getText()); 
      } catch (JMSException e) { 
       LOG.log(Level.SEVERE, "Unexpected exception during task processing"); 
      } 
     } 
    } 

} 

TaskListener.java

public class TaskListener implements MessageListener { 

    private TaskProcessor processor; 

    @Override 
    public void onMessage(Message message) { 
     processor.processTask(message); 
    } 

    public void setProcessor(TaskProcessor processor) { 
     this.processor = processor; 
    } 

} 

MockTaskProducer.java

@Configurable 
public class MockTaskProducer implements ApplicationContextAware { 
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName()); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    private Destination destination; 

    private int taskCounter = 0; 

    public void produceTask() { 
     LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")"); 

     taskCounter++; 

     jmsTemplate.send(destination, new MessageCreator() { 
      @Override 
      public Message createMessage(Session session) throws JMSException { 
       TextMessage message = session.createTextMessage("task-" + taskCounter); 
       return message; 
      } 
     }); 
    } 

    @Override 
    public void setApplicationContext(ApplicationContext applicationContext) 
      throws BeansException { 
     destination = applicationContext.getBean("tasksQueue", Destination.class); 
    } 
} 
+1

當我設置'listenerContainer.setSessionTransacted(true)'我看到消息被重新傳遞,但只傳遞給'FailingWorker'。在停止相應的偵聽器容器之後的事件中,'SucceedingWorker'永遠不會獲得重新發送的消息。 – 2012-03-26 13:57:47

+1

我出現了'listenerContainer.stop()' - 方法不關閉與提供的連接,因此JMS提供程序繼續嘗試將失敗的消息重新傳遞給同一個使用者。爲了避免失敗的使用者在某個時候應該調用listenerContainer.shutdown()。 – 2012-03-27 08:36:02

回答

7

顯然我昨天看到的文檔的來源Creating Robust JMS Applications以某種方式誤導了我(或者我可能錯誤地理解了它)。特別是摘錄如下:

在確認JMS消息之前,它不被認爲是成功使用了 。成功消費 通常發生在三個階段。

  1. 客戶端收到該消息。
  2. 客戶端處理消息。
  3. 該消息被確認。確認由JMS提供商或客戶端啓動,具體取決於會話 確認模式。

我認爲AUTO_ACKNOWLEDGE正是這麼做的 - 確認該消息監聽器方法返回一個結果之後。但是根據JMS規範,它有點不同,而Spring收聽器的容器並不會試圖改變JMS規範的行爲。這是用AbstractMessageListenerContainer的Javadoc,不得不說 - 我已經強調了重要的句子:

監聽器容器提供了以下消息確認 選項:

  • 「SessionAcknowledgeMode來」設爲「 AUTO_ACKNOWLEDGE「(默認):在偵聽器執行之前自動發送消息確認;在拋出異常情況下無法重新投遞。
  • 「sessionAcknowledgeMode」設置爲「CLIENT_ACKNOWLEDGE」:成功偵聽器執行後自動確認消息;沒有 在發生異常情況下重新發送。
  • 「sessionAcknowledgeMode」設置爲「DUPS_OK_ACKNOWLEDGE」:在偵聽器執行期間或之後的惰性消息確認;潛在的 在發生異常情況下重新發送。
  • 「sessionTransacted」設置爲「true」:成功偵聽器執行後的事務確認;保證在發生異常情況下的重新投遞。

所以關鍵要我的解決方案是listenerContainer.setSessionTransacted(true);

我面臨的另一個問題是,JMS提供者保留重新傳送失敗的消息發送回了郵件的處理過程中失敗的相同的消費者。我不知道JMS規範是否給出了處方提供者應該在這種情況下應該做什麼,但是對我來說有效的是使用listenerContainer.shutdown();來斷開發生故障的消費者並允許提供者重新傳遞消息並給予一個機會到另一個消費者。