1

我正在嘗試設置一個集成流來消耗來自亞馬遜sqs隊列的消息並且其工作正常。但我想每分鐘或幾秒鐘調整消息的數量。例如每分鐘20條消息。如何使用彈簧集成加速sqs隊列的消耗

這裏是我的SQL監聽器bean

@Bean 
    public MessageProducer mySqsMessageDrivenChannelAdapter() { 
     SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName); 
     adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS); 

     adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY); 
     adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT); 
     adapter.setMaxNumberOfMessages(prefetch); 
     adapter.setOutputChannel(processMessageChannel()); 
     return adapter; 
    } 

正如你所看到的,我設置的最大消息數每個輪詢來獲取,但如何設置輪詢之間的延遲的定義是什麼?

在常規JMS隊列我可以用一個JMS.inboundAdapter使用自定義的輪詢,但似乎使用SqsMessageDrivenChannelAdapter我不能設置任何輪詢定時器的值。

也許我可以使用MessageProducer而不是SqsMessageDrivenChannelAdapter,但哪一個?

是否可以使用sqs設置JMS.inboundAdapter?

+0

它seens的解決方案可以在這裏找到https://stackoverflow.com/questions/29667321/polling-interval-for-jms-messagelistener-with-sqs-provider?rq=1。在這種情況下,這個問題可以被認爲是重複的,但這裏的重點是我使用了spring-integration。我會嘗試適應當前存在的解決方案,如果它有效,我會關閉這個解決方案。 –

回答

1

Spring集成SqsMessageDrivenChannelAdapter是一個消息驅動程序活動組件。它基於Springh Cloud AWS項目中的SimpleMessageListenerContainer,該項目長期運行while()循環以致電AmazonSQS.receiveMessage()。在這個循環的邏輯是不是太複雜:

try { 
    ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest()); 
    CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size()); 
    for (Message message : receiveMessageResult.getMessages()) { 
     if (isQueueRunning()) { 
      MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes); 
       getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor)); 
     } else { 
      messageBatchLatch.countDown(); 
     } 
    } 
    try { 
     messageBatchLatch.await(); 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } 
} catch (Exception e) { 

正如你看到的,我們創建有messageBatchLatch和循環之後等待它。 每條消息都由它們自己的SignalExecutingRunnable處理,其中countDown()MessageExecutor的末尾。所以,你想要做的或許是通過目標服務方法中的人工Thread.sleep()在SQS輪詢之間獲得更多的時間間隔。

但是我聽到了你們的要求,我們確實需要添加類似:

/** 
* The sleep interval in milliseconds used in the main loop between shards polling cycles. 
* Defaults to {@code 1000} minimum {@code 250}. 
* @param idleBetweenPolls the interval to sleep between shards polling cycles. 
*/ 
public void setIdleBetweenPolls(int idleBetweenPolls) { 
    this.idleBetweenPolls = Math.max(250, idleBetweenPolls); 
} 

我這樣做是爲KinesisMessageDrivenChannelAdapter,但在這裏我們不得不要求春雲AWS這樣做的SimpleMessageListenerContainer