2016-04-03 149 views
1

我想做一個有界的檢查器,只有一個固定數量的線程可以並行執行。當添加更多任務時,執行程序將阻塞,直到其他線程完成。executorService拋出與SynchronousQueue的RejectException

這裏是執行者,我發現在其他問題。

public class BoundedExecutor extends ThreadPoolExecutor { 

    private final Logger logger = LogManager.getLogger(BoundedExecutor.class); 
    private final Semaphore semaphore; 

    public BoundedExecutor(int bound){ 
     super(bound, bound, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); 
     this.semaphore = new Semaphore(bound); 
    } 

    @Override 
    public void execute(Runnable task) { 
     try { 
      semaphore.acquire(); 
      super.execute(task); 
     } catch (InterruptedException e) { 
      logger.error("interruptedException while acquiring semaphore"); 
     } 
    } 

    protected void afterExecute(final Runnable task, final Throwable t){ 
     super.afterExecute(task, t); 
     semaphore.release(); 
    } 
} 

public static void main(String[] args) throws Exception { 

     Runnable task =() -> { 
      try { 
       Thread.sleep(1000); 
       System.out.println(Thread.currentThread().getName() + " complete."); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }; 
     BoundedExecutor pool = new BoundedExecutor(1); 
     for(int i = 0; i < 10; i++){ 
      pool.execute(task); 
     } 
     pool.shutdown(); 
    } 

我以爲代碼所做的一個單獨的線程,並會按順序執行任務,但實際上,當第一個任務完成後,執行拋出java.util.concurrent.RejectedExecutionException主代碼。

因爲我不理解,semaphore.acquire()會阻塞線程,直到第一個任務完成並釋放信號量,代碼有什麼問題?

+0

?你通過這樣做想達到什麼目的? –

+0

信號量用於在池溢出時阻塞ThreadExecutor添加新任務。 同步隊列被使用,因爲我不想隊列任務,當池被充滿時。 – iceshi

+0

因此,使用SynchronousQueue會阻塞,只要沒有空閒線程來拾取它,那麼Semaphore會爲此添加什麼? –

回答

2

我會做的隊列塊,而不是使用信號燈

public static void main(String[] args) { 
    SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>() { 
     @Override 
     public boolean offer(Runnable runnable) { 
      try { 
       return super.offer(runnable, 1, TimeUnit.MINUTES); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       return false; 
      } 
     } 
    }; 
    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, queue); 
    for (int i = 0; i < 10; i++) { 
     final int finalI = i; 
     pool.execute(() -> { 
      try { 
       Thread.sleep(1000); 
       System.out.println(LocalTime.now() + " - " + Thread.currentThread().getName() + " " + finalI + " complete"); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }); 
    } 
    pool.shutdown(); 
} 

打印你爲什麼要使用一個信號量用的SynchronousQueue

13:24:14.241 - pool-1-thread-1 0 complete 
13:24:15.247 - pool-1-thread-1 1 complete 
13:24:16.247 - pool-1-thread-1 2 complete 
13:24:17.247 - pool-1-thread-1 3 complete 
13:24:18.248 - pool-1-thread-1 4 complete 
13:24:19.248 - pool-1-thread-1 5 complete 
13:24:20.248 - pool-1-thread-1 6 complete 
13:24:21.248 - pool-1-thread-1 7 complete 
13:24:22.249 - pool-1-thread-1 8 complete 
13:24:23.249 - pool-1-thread-1 9 complete 
+0

注意:這個解決方案會讓工作線程的數量不超過'corePoolSize' – louxiu