2016-08-05 163 views
0

我們正在使用Spring Cloud AWS與SQS進行交互。我們使用@SqsListener註釋將消息從隊列中拉出。我們有deletionPolicy = NEVER,這意味着我們手動確認所有我們選中的郵件。春季雲SQS消費阻塞,直到所有消息處理

我們的問題是SimpleMessageListenerContainer(它處理隊列中的消息處理)等待所有工作線程完成,然後再從隊列中選擇更多的消息。

換句話說,我們現在看到的是這樣的:

  • 拉10個關閉消息隊列。
  • 啓動10個線程來完成工作。
  • 其中一個正在執行工作的線程在IO調用緩慢時被阻塞。
  • 現在阻止應用程序從隊列中獲取更多消息,並且阻止應用程序執行更多工作,直到慢速呼叫結束。

我們可以看到在SimpleMessageListenerContainer.AsynchronousMessageListener代碼負責此

@Override 
public void run() { 
    while (isQueueRunning()) { 
     try { 
      ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest()); 
      CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size()); 
      for (Message message : receiveMessageResult.getMessages()) { 
       if (isQueueRunning()) { 
        MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes); 
        getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor)); 
       } else { 
        messageBatchLatch.countDown(); 
       } 
      } 
      try { 
       messageBatchLatch.await(); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } catch (Exception e) { 
      getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be " + 
        "retried in {} milliseconds", this.logicalQueueName, getBackOffTime(), e); 
      try { 
       //noinspection BusyWait 
       Thread.sleep(getBackOffTime()); 
      } catch (InterruptedException ie) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
    } 
} 

理想情況下,我們希望對消息監聽器持續回暖的消息從隊列中進行處理。

由於AbstractMessageListenerContainer是本地包,我們似乎無法實現我們自己的MessageListenerContainer

是否有解決此問題的方法?

回答

0

什麼是消息輪詢線程是messageBatchLatch.await()聲明。看來,只是刪除閂鎖將做到這一點。喜歡的東西:

@Override 
public void run() { 
    while (isQueueRunning()) { 
     try { 
      ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest()); 
      for (Message message : receiveMessageResult.getMessages()) { 
       if (isQueueRunning()) { 
        MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes); 
        getTaskExecutor().execute(new SignalExecutingRunnable(messageExecutor)); 
       } 
      } 
     } catch (Exception e) { 
      getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be " + 
        "retried in {} milliseconds", this.logicalQueueName, getBackOffTime(), e); 
      try { 
       //noinspection BusyWait 
       Thread.sleep(getBackOffTime()); 
      } catch (InterruptedException ie) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
    } 
} 

這將工作,如果你的TaskExecutor實現: - 有一個固定大小的線程池 - 塊時execute函數被調用,並沒有可用的線程。

這是大多數實現的工作原理,但值得檢查一下。

+0

謝謝,但我們將如何重寫「AsynchronousMessageListener」?這是私人的.. – GBC

+0

對不起,沒有意識到這不是你的代碼: - (...嗯,在這種情況下,假設春天不允許你覆蓋這種行爲,我想它會更容易實現你自己的隊列輪詢代碼... – Filipe

+0

你可以重載'startQueue(String queueName,QueueAttributes queueAttributes)',對吧? – skirsch