我想做一個有界的檢查器,只有一個固定數量的線程可以並行執行。當添加更多任務時,執行程序將阻塞,直到其他線程完成。executorService拋出與SynchronousQueue的RejectException
這裏是執行者,我發現在其他問題。
public class BoundedExecutor extends ThreadPoolExecutor {
private final Logger logger = LogManager.getLogger(BoundedExecutor.class);
private final Semaphore semaphore;
public BoundedExecutor(int bound){
super(bound, bound, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
this.semaphore = new Semaphore(bound);
}
@Override
public void execute(Runnable task) {
try {
semaphore.acquire();
super.execute(task);
} catch (InterruptedException e) {
logger.error("interruptedException while acquiring semaphore");
}
}
protected void afterExecute(final Runnable task, final Throwable t){
super.afterExecute(task, t);
semaphore.release();
}
}
和
public static void main(String[] args) throws Exception {
Runnable task =() -> {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " complete.");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
BoundedExecutor pool = new BoundedExecutor(1);
for(int i = 0; i < 10; i++){
pool.execute(task);
}
pool.shutdown();
}
我以爲代碼所做的一個單獨的線程,並會按順序執行任務,但實際上,當第一個任務完成後,執行拋出java.util.concurrent.RejectedExecutionException主代碼。
因爲我不理解,semaphore.acquire()會阻塞線程,直到第一個任務完成並釋放信號量,代碼有什麼問題?
?你通過這樣做想達到什麼目的? –
信號量用於在池溢出時阻塞ThreadExecutor添加新任務。 同步隊列被使用,因爲我不想隊列任務,當池被充滿時。 – iceshi
因此,使用SynchronousQueue會阻塞,只要沒有空閒線程來拾取它,那麼Semaphore會爲此添加什麼? –