2016-07-28 76 views
1

我想只允許在給定的時間正在處理的sqs隊列中的一個項目。目前它只會提取一個隊列中的單個消息,但它會繼續在每次調查時都會這樣做。Aws整合彈簧:保證只有一個項目從平方

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(2); 
     executor.setMaxPoolSize(2); 
     executor.setQueueCapacity(10); 
     executor.setThreadNamePrefix("test-"); 
     executor.initialize(); 
     return executor; 

     new SqsMessageDrivenChannelAdapter(amazon)); 
     adapter.setMaxNumberOfMessages(1); 
     adapter.setSendTimeout(2000); 
     adapter.setVisibilityTimeout(1200); 
     adapter.setWaitTimeOut(20); 
     adapter.setTaskExecutor(this.asyncTaskExecutor()); 

這個問題似乎是在ThreadPoolTask​​Executor和我的理解。由於隊列大小爲10,它每次都會提升,直到滿滿爲止?

設置maxPoolSize到1導致的

Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.[email protected]406354e5 rejected from [email protected][Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] 
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) 
    ... 6 common frames omitted 

回答

1

的問題是,你的ThreadPoolExecutor被設定爲具有使用2個線程消耗掉這個隊列中的消息10的BlockingQueue大小。所以在任何時候,你都可以有兩個線程同時處理消息。如果將PoolSize設置爲1,則可以保證在給定時間只能處理一條消息。

從源代碼:

/* 
* Proceed in 3 steps: 
* 
* 1. If fewer than corePoolSize threads are running, try to 
* start a new thread with the given command as its first 
* task. The call to addWorker atomically checks runState and 
* workerCount, and so prevents false alarms that would add 
* threads when it shouldn't, by returning false. 
* 
* 2. If a task can be successfully queued, then we still need 
* to double-check whether we should have added a thread 
* (because existing ones died since last checking) or that 
* the pool shut down since entry into this method. So we 
* recheck state and if necessary roll back the enqueuing if 
* stopped, or start a new thread if there are none. 
* 
* 3. If we cannot queue task, then we try to add a new 
* thread. If it fails, we know we are shut down or saturated 
* and so reject the task. 

你打第三種情況。

+0

這是我的第一個想法,這不起作用見上文。 @Gandalf – user101010101

+0

可能需要更多的代碼來解決這個問題。 GitHub的? – Gandalf