2017-01-09 79 views
1

我開發了一段多線程的代碼。此代碼在Web應用程序中調用,因此可能由多個線程(請求)並行執行。爲了保持這個代碼將要創建的線程數量(通過被幾個並行請求調用),我使用一個靜態共享的ThreadPoolExecutor(Executors.newFixedThreadPool(nbOfThreads))。所以我確信這段代碼永遠不會創建超過nbOfThreads的線程。要跟蹤給定請求中涉及的任務並等待它們完成,我會爲每個請求使用一個CompletionService。Java - 平衡ThreadPoolExecutor公平地給線程並行請求

現在,我想通過池中的線程向請求發送請求的方式來獲得一些「公平性」(不確定是否是好詞)。 使用默認的固定ThreadPoolExecutor,等待隊列是LinkedBlockingQueue。它根據它們的到達順序(FIFO)向執行器提供任務。想象一下,池核心大小是100個線程。第一個請求很大,涉及創建150個任務。所以它會使池滿,並將50個任務放入等待隊列中。如果第二個微小請求在1秒後到達,即使它只需要池中的2個線程,它也必須等待第一個大請求創建的所有150個任務在處理之前完成。

如何使池公平均勻地爲每個請求提供線程?如何使第二個請求的2個任務在第一個查詢的所有50個等待任務之後不等待?

我的想法是開發一個BlockingQueue的個人實現給予ThreadPoolExecutor。這個BlockingQueue將存儲由創建它們的請求分類的等待任務(在具有key中的請求的id的背景Map中,以及LinkedBlockingQueue將請求的任務存儲在值中)。然後當ThreadPoolExecutor takepoll一個新的任務從隊列中排隊時,隊列會每次從一個不同的請求中提交一個任務......這是否是正確的方法?用例對我來說似乎很常見。我很驚訝我必須自己實施這種習慣和乏味的東西。這就是爲什麼我認爲我可能是錯的,並且存在這樣做的最佳實踐。

這是我做的代碼。它的工作原理,但仍然想知道這是否是正確的方法。

public class TestThreadPoolExecutorWithTurningQueue { 

    private final static Logger logger = LogManager.getLogger(); 

    private static ThreadPoolExecutor executorService; 

    int nbRequest = 4; 

    int nbThreadPerRequest = 8; 

    int threadPoolSize = 5; 

    private void init() { 
     executorService = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, TimeUnit.MILLISECONDS, 
       new CategoryBlockingQueue<Runnable>()// my custom blocking queue storing waiting tasks per request 
       //new LinkedBlockingQueue<Runnable>() 
     ); 
    } 

    @Test 
    public void test() throws Exception { 
     init(); 
     // Parallel requests arriving 
     ExecutorService tomcat = Executors.newFixedThreadPool(nbRequest); 
     for (int i = 0; i < nbRequest; i++) { 
      Thread.sleep(10); 
      final int finalI = i; 
      tomcat.execute(new Runnable() { 
       @Override 
       public void run() { 
        request(finalI); 
       } 
      }); 
     } 
     tomcat.shutdown(); 
     tomcat.awaitTermination(1, TimeUnit.DAYS); 
    } 

    // Code executed by each request 
    // Code multi-threaded using a single shared ThreadPoolExecutor to keep the 
    // number of threads under control 
    public void request(final int requestId) { 
     final List<Future<Object>> futures = new ArrayList<>(); 
     CustomCompletionService<Object> completionService = new CustomCompletionService<>(executorService); 
     for (int j = 0; j < nbThreadPerRequest; j++) { 
      final int finalJ = j; 
      futures.add(completionService.submit(new CategoryRunnable(requestId) { 
       @Override 
       public void run() { 
        logger.debug("thread " + finalJ + " of request " + requestId); 
        try { 
         // here should come the useful things to be done 
         Thread.sleep(2000); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
       } 
      }, null)); 
     } 
     // Wait fot completion of all the tasks of the request 
     // If a task threw an exception, cancel the other tasks of the request 
     for (int j = 0; j < nbThreadPerRequest; j++) { 
      try { 
       completionService.take().get(); 
      } catch (Exception e) { 
       // Cancel the remaining tasks 
       for (Future<Object> future : futures) { 
        future.cancel(true); 
       } 

       // Get the underlying exception 
       Exception toThrow = e; 
       if (e instanceof ExecutionException) { 
        ExecutionException ex = (ExecutionException) e; 
        toThrow = (Exception) ex.getCause(); 
       } 
       throw new RuntimeException(toThrow); 
      } 
     } 
    } 

    public class CustomCompletionService<V> implements CompletionService<V> { 

     private final Executor executor; 

     private final BlockingQueue<Future<V>> completionQueue; 

     public CustomCompletionService(Executor executor) { 
      if (executor == null) 
       throw new NullPointerException(); 
      this.executor = executor; 
      this.completionQueue = new LinkedBlockingQueue<Future<V>>(); 
     } 

     private RunnableFuture<V> newTaskFor(Callable<V> task) { 
      return new FutureTask<V>(task); 
     } 

     private RunnableFuture<V> newTaskFor(Runnable task, V result) { 
      return new FutureTask<V>(task, result); 
     } 

     public Future<V> submit(CategoryCallable<V> task) { 
      if (task == null) throw new NullPointerException(); 
      RunnableFuture<V> f = newTaskFor(task); 
      executor.execute(new CategorizedQueueingFuture(f, task.getCategory())); 
      return f; 
     } 

     public Future<V> submit(CategoryRunnable task, V result) { 
      if (task == null) throw new NullPointerException(); 
      RunnableFuture<V> f = newTaskFor(task, result); 
      executor.execute(new CategorizedQueueingFuture(f, task.getCategory())); 
      return f; 
     } 

     public Future<V> submit(CategoryRunnable task) { 
      return submit(task, null); 
     } 

     @Override 
     public Future<V> submit(Callable<V> task) { 
      throw new IllegalArgumentException("Must use a 'CategoryCallable'"); 
     } 

     @Override 
     public Future<V> submit(Runnable task, V result) { 
      throw new IllegalArgumentException("Must use a 'CategoryRunnable'"); 
     } 

     public Future<V> take() throws InterruptedException { 
      return completionQueue.take(); 
     } 

     public Future<V> poll() { 
      return completionQueue.poll(); 
     } 

     public Future<V> poll(long timeout, TimeUnit unit) 
       throws InterruptedException { 
      return completionQueue.poll(timeout, unit); 
     } 

     /** 
     * FutureTask extension to enqueue upon completion + Category 
     */ 
     public class CategorizedQueueingFuture extends FutureTask<Void> { 

      private final Future<V> task; 

      private int category; 

      CategorizedQueueingFuture(RunnableFuture<V> task, int category) { 
       super(task, null); 
       this.task = task; 
       this.category = category; 
      } 

      protected void done() { 
       completionQueue.add(task); 
      } 

      public int getCategory() { 
       return category; 
      } 
     } 
    } 

    public abstract class CategoryRunnable implements Runnable { 

     private int category; 

     public CategoryRunnable(int category) { 
      this.category = category; 
     } 

     public int getCategory() { 
      return category; 
     } 
    } 

    public abstract class CategoryCallable<V> implements Callable<V> { 

     private int category; 

     public CategoryCallable(int category) { 
      this.category = category; 
     } 

     public int getCategory() { 
      return category; 
     } 
    } 

    public class CategoryBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { 

     private Map<Integer, LinkedBlockingQueue<E>> map = new HashMap<>(); 

     private AtomicInteger count = new AtomicInteger(0); 

     private ReentrantLock lock = new ReentrantLock(); 

     private LinkedBlockingQueue<Integer> nextCategories = new LinkedBlockingQueue<>(); 

     @Override 
     public boolean offer(E e) { 
      CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) e; 
      lock.lock(); 
      try { 
       int category = item.getCategory(); 
       if (!map.containsKey(category)) { 
        map.put(category, new LinkedBlockingQueue<E>()); 
        nextCategories.offer(category); 
       } 
       boolean b = map.get(category).offer(e); 
       if (b) { 
        count.incrementAndGet(); 
       } 
       return b; 
      } finally { 
       lock.unlock(); 
      } 
     } 

     @Override 
     public E poll() { 
      return null; 
     } 

     @Override 
     public E peek() { 
      return null; 
     } 

     @Override 
     public void put(E e) throws InterruptedException { 

     } 

     @Override 
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 
      return false; 
     } 

     @Override 
     public E take() throws InterruptedException { 
      lock.lockInterruptibly(); 
      try { 
       Integer nextCategory = nextCategories.take(); 
       LinkedBlockingQueue<E> categoryElements = map.get(nextCategory); 
       E e = categoryElements.take(); 
       count.decrementAndGet(); 
       if (categoryElements.isEmpty()) { 
        map.remove(nextCategory); 
       } else { 
        nextCategories.offer(nextCategory); 
       } 
       return e; 
      } finally { 
       lock.unlock(); 
      } 
     } 

     @Override 
     public boolean remove(Object o) { 
      CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) o; 
      lock.lock(); 
      try { 
       int category = item.getCategory(); 
       LinkedBlockingQueue<E> categoryElements = map.get(category); 
       boolean b = categoryElements.remove(item); 
       if (categoryElements.isEmpty()) { 
        map.remove(category); 
       } 
       if (b) { 
        count.decrementAndGet(); 
       } 
       return b; 
      } finally { 
       lock.unlock(); 
      } 
     } 

     @Override 
     public int drainTo(Collection<? super E> c) { 
      return 0; 
     } 

     @Override 
     public int drainTo(Collection<? super E> c, int maxElements) { 
      return 0; 
     } 

     @Override 
     public Iterator<E> iterator() { 
      return null; 
     } 

     @Override 
     public int size() { 
      return count.get(); 
     } 

     @Override 
     public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
      // TODO 
      return null; 
     } 

     @Override 
     public int remainingCapacity() { 
      return 0; 
     } 

    } 
} 

輸出與傳統的LinkedBlockingQueue

2017-01-09 14:56:13,061 [pool-2-thread-1] DEBUG - thread 0 of request 0 
2017-01-09 14:56:13,061 [pool-2-thread-4] DEBUG - thread 3 of request 0 
2017-01-09 14:56:13,061 [pool-2-thread-2] DEBUG - thread 1 of request 0 
2017-01-09 14:56:13,061 [pool-2-thread-3] DEBUG - thread 2 of request 0 
2017-01-09 14:56:13,061 [pool-2-thread-5] DEBUG - thread 4 of request 0 
2017-01-09 14:56:15,063 [pool-2-thread-2] DEBUG - thread 5 of request 0 
2017-01-09 14:56:15,063 [pool-2-thread-1] DEBUG - thread 6 of request 0 
2017-01-09 14:56:15,063 [pool-2-thread-4] DEBUG - thread 7 of request 0 
2017-01-09 14:56:15,063 [pool-2-thread-3] DEBUG - thread 0 of request 1 
2017-01-09 14:56:15,063 [pool-2-thread-5] DEBUG - thread 1 of request 1 
2017-01-09 14:56:17,064 [pool-2-thread-2] DEBUG - thread 2 of request 1 
2017-01-09 14:56:17,064 [pool-2-thread-4] DEBUG - thread 3 of request 1 
2017-01-09 14:56:17,064 [pool-2-thread-1] DEBUG - thread 5 of request 1 
2017-01-09 14:56:17,064 [pool-2-thread-3] DEBUG - thread 4 of request 1 
2017-01-09 14:56:17,064 [pool-2-thread-5] DEBUG - thread 6 of request 1 
2017-01-09 14:56:19,064 [pool-2-thread-4] DEBUG - thread 7 of request 1 
2017-01-09 14:56:19,064 [pool-2-thread-1] DEBUG - thread 0 of request 2 
2017-01-09 14:56:19,064 [pool-2-thread-3] DEBUG - thread 1 of request 2 
2017-01-09 14:56:19,064 [pool-2-thread-5] DEBUG - thread 2 of request 2 
2017-01-09 14:56:19,064 [pool-2-thread-2] DEBUG - thread 3 of request 2 
2017-01-09 14:56:21,064 [pool-2-thread-4] DEBUG - thread 4 of request 2 
2017-01-09 14:56:21,064 [pool-2-thread-3] DEBUG - thread 5 of request 2 
2017-01-09 14:56:21,064 [pool-2-thread-5] DEBUG - thread 6 of request 2 
2017-01-09 14:56:21,064 [pool-2-thread-2] DEBUG - thread 7 of request 2 
2017-01-09 14:56:21,064 [pool-2-thread-1] DEBUG - thread 0 of request 3 
2017-01-09 14:56:23,064 [pool-2-thread-4] DEBUG - thread 2 of request 3 
2017-01-09 14:56:23,064 [pool-2-thread-3] DEBUG - thread 1 of request 3 
2017-01-09 14:56:23,064 [pool-2-thread-2] DEBUG - thread 3 of request 3 
2017-01-09 14:56:23,064 [pool-2-thread-1] DEBUG - thread 4 of request 3 
2017-01-09 14:56:23,064 [pool-2-thread-5] DEBUG - thread 5 of request 3 
2017-01-09 14:56:25,064 [pool-2-thread-2] DEBUG - thread 7 of request 3 
2017-01-09 14:56:25,064 [pool-2-thread-1] DEBUG - thread 6 of request 3 

輸出,帶我自定義CategoryBlockingQueue

2017-01-09 14:54:54,765 [pool-2-thread-3] DEBUG - thread 2 of request 0 
2017-01-09 14:54:54,765 [pool-2-thread-2] DEBUG - thread 1 of request 0 
2017-01-09 14:54:54,765 [pool-2-thread-5] DEBUG - thread 4 of request 0 
2017-01-09 14:54:54,765 [pool-2-thread-1] DEBUG - thread 0 of request 0 
2017-01-09 14:54:54,765 [pool-2-thread-4] DEBUG - thread 3 of request 0 
2017-01-09 14:54:56,767 [pool-2-thread-1] DEBUG - thread 0 of request 1 
2017-01-09 14:54:56,767 [pool-2-thread-4] DEBUG - thread 0 of request 3 
2017-01-09 14:54:56,767 [pool-2-thread-3] DEBUG - thread 5 of request 0 
2017-01-09 14:54:56,767 [pool-2-thread-5] DEBUG - thread 0 of request 2 
2017-01-09 14:54:56,767 [pool-2-thread-2] DEBUG - thread 6 of request 0 
2017-01-09 14:54:58,767 [pool-2-thread-1] DEBUG - thread 1 of request 1 
2017-01-09 14:54:58,767 [pool-2-thread-5] DEBUG - thread 1 of request 2 
2017-01-09 14:54:58,767 [pool-2-thread-2] DEBUG - thread 7 of request 0 
2017-01-09 14:54:58,767 [pool-2-thread-4] DEBUG - thread 1 of request 3 
2017-01-09 14:54:58,767 [pool-2-thread-3] DEBUG - thread 2 of request 1 
2017-01-09 14:55:00,767 [pool-2-thread-1] DEBUG - thread 2 of request 2 
2017-01-09 14:55:00,767 [pool-2-thread-5] DEBUG - thread 2 of request 3 
2017-01-09 14:55:00,767 [pool-2-thread-2] DEBUG - thread 3 of request 1 
2017-01-09 14:55:00,767 [pool-2-thread-4] DEBUG - thread 3 of request 2 
2017-01-09 14:55:00,767 [pool-2-thread-3] DEBUG - thread 3 of request 3 
2017-01-09 14:55:02,767 [pool-2-thread-5] DEBUG - thread 4 of request 1 
2017-01-09 14:55:02,767 [pool-2-thread-3] DEBUG - thread 4 of request 2 
2017-01-09 14:55:02,767 [pool-2-thread-2] DEBUG - thread 4 of request 3 
2017-01-09 14:55:02,767 [pool-2-thread-1] DEBUG - thread 5 of request 1 
2017-01-09 14:55:02,767 [pool-2-thread-4] DEBUG - thread 5 of request 2 
2017-01-09 14:55:04,767 [pool-2-thread-2] DEBUG - thread 5 of request 3 
2017-01-09 14:55:04,767 [pool-2-thread-1] DEBUG - thread 6 of request 1 
2017-01-09 14:55:04,767 [pool-2-thread-5] DEBUG - thread 6 of request 2 
2017-01-09 14:55:04,767 [pool-2-thread-3] DEBUG - thread 6 of request 3 
2017-01-09 14:55:04,768 [pool-2-thread-4] DEBUG - thread 7 of request 1 
2017-01-09 14:55:06,768 [pool-2-thread-2] DEBUG - thread 7 of request 3 
2017-01-09 14:55:06,768 [pool-2-thread-1] DEBUG - thread 7 of request 2 
+0

是否需要並行執行一個請求的所有「任務」?有100個線程(這是很多)?或者是否足以讓每個請求都有一個更小但專用的池? – Fildor

+0

@Fildor事實上,我的第一個實現使用每個請求較小的專用池(核心大小= 20)。但是,當我試圖通過並行啓動幾個大的請求來「壓力測試」時,它啓動了太多的線程。例如,10個併發請求讓應用程序創建200個線程(10個線程池的核心大小爲20)。這就是爲什麼我認爲有一個共享的ThreadPoolExecutor可以真正允許「控制」可能存在的最大線程,即使在高工作負載下也是如此。 – Comencau

+0

您可以使用PriorityQueue:https://docs.oracle.com/javase/8/docs/api/java/util/PriorityQueue.html請參閱http://stackoverflow.com/questions/3198660/java-executors-how -can-i-set-task-priority –

回答

0

最後這裏是我做過什麼給池中的線程在一個「公平」反過來每個並行請求, 「均衡」的態度。這工作。請,如果出現問題或者有更好的方法,請告訴我。

總之,我創建了一個BlockingQueue供池使用。該隊列將請求的任務存儲在一個Map中,該Map根據它們相關的請求對它們進行分類。然後,由池調用的獲取或提供方法獲得要執行的新任務,每次都會從新請求中提供一項任務。

我需要調整CompletionService以處理Runnable和Callable,其中有一個附加字段作爲請求的ID。

public class TestThreadPoolExecutorWithTurningQueue { 

    private final static Logger logger = LogManager.getLogger(); 

    private static ThreadPoolExecutor executorService; 

    int nbRequest = 4; 

    int nbThreadPerRequest = 8; 

    int threadPoolSize = 5; 

    private void init() { 
     executorService = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 10L, TimeUnit.SECONDS, 
       new CategoryBlockingQueue<Runnable>()// my custom blocking queue storing waiting tasks per request 
       //new LinkedBlockingQueue<Runnable>() 
     ); 
     executorService.allowCoreThreadTimeOut(true); 
    } 

    @Test 
    public void test() throws Exception { 
     init(); 
     ExecutorService tomcat = Executors.newFixedThreadPool(nbRequest); 
     for (int i = 0; i < nbRequest; i++) { 
      Thread.sleep(10); 
      final int finalI = i; 
      tomcat.execute(new Runnable() { 
       @Override 
       public void run() { 
        request(finalI); 
       } 
      }); 
     } 

     for (int i = 0; i < 100; i++) { 
      Thread.sleep(1000); 
      logger.debug("TPE = " + executorService); 
     } 

     tomcat.shutdown(); 
     tomcat.awaitTermination(1, TimeUnit.DAYS); 
    } 

    public void request(final int requestId) { 
     CustomCompletionService<Object> completionService = new CustomCompletionService<>(executorService); 
     for (int j = 0; j < nbThreadPerRequest; j++) { 
      final int finalJ = j; 
      completionService.submit(new CategoryRunnable(requestId) { 
       @Override 
       public void go() throws Exception { 
        logger.debug("thread " + finalJ + " of request " + requestId + " " + executorService); 
        Thread.sleep(2000);// here should come the useful things to be done 
       } 
      }); 
     } 
     completionService.awaitCompletion(); 
    } 

    public class CustomCompletionService<V> implements CompletionService<V> { 

     private final Executor executor; 

     private final BlockingQueue<Future<V>> completionQueue; 

     private List<RunnableFuture<V>> submittedTasks = new ArrayList<>(); 

     public CustomCompletionService(Executor executor) { 
      if (executor == null) 
       throw new NullPointerException(); 
      this.executor = executor; 
      this.completionQueue = new LinkedBlockingQueue<>(); 
     } 

     public void awaitCompletion() { 
      for (int i = 0; i < submittedTasks.size(); i++) { 
       try { 
        take().get(); 
       } catch (Exception e) { 
        // Cancel the remaining tasks 
        for (RunnableFuture<V> f : submittedTasks) { 
         f.cancel(true); 
        } 

        // Get the underlying exception 
        Exception toThrow = e; 
        if (e instanceof ExecutionException) { 
         ExecutionException ex = (ExecutionException) e; 
         toThrow = (Exception) ex.getCause(); 
        } 
        throw new RuntimeException(toThrow); 
       } 
      } 
     } 

     private RunnableFuture<V> newTaskFor(Callable<V> task) { 
      return new FutureTask<V>(task); 
     } 

     private RunnableFuture<V> newTaskFor(Runnable task, V result) { 
      return new FutureTask<V>(task, result); 
     } 

     public Future<V> submit(CategoryCallable<V> task) { 
      if (task == null) throw new NullPointerException(); 
      RunnableFuture<V> f = newTaskFor(task); 
      executor.execute(new CategorizedQueueingFuture(f, task.getCategory())); 
      submittedTasks.add(f); 
      return f; 
     } 

     public Future<V> submit(CategoryRunnable task, V result) { 
      if (task == null) throw new NullPointerException(); 
      RunnableFuture<V> f = newTaskFor(task, result); 
      executor.execute(new CategorizedQueueingFuture(f, task.getCategory())); 
      submittedTasks.add(f); 
      return f; 
     } 

     public Future<V> submit(CategoryRunnable task) { 
      return submit(task, null); 
     } 

     @Override 
     public Future<V> submit(Callable<V> task) { 
      throw new IllegalArgumentException("Must use a 'CategoryCallable'"); 
     } 

     @Override 
     public Future<V> submit(Runnable task, V result) { 
      throw new IllegalArgumentException("Must use a 'CategoryRunnable'"); 
     } 

     public Future<V> take() throws InterruptedException { 
      return completionQueue.take(); 
     } 

     public Future<V> poll() { 
      return completionQueue.poll(); 
     } 

     public Future<V> poll(long timeout, TimeUnit unit) 
       throws InterruptedException { 
      return completionQueue.poll(timeout, unit); 
     } 

     /** 
     * FutureTask extension to enqueue upon completion + Category 
     */ 
     public class CategorizedQueueingFuture extends FutureTask<Void> { 

      private final Future<V> task; 

      private int category; 

      CategorizedQueueingFuture(RunnableFuture<V> task, int category) { 
       super(task, null); 
       this.task = task; 
       this.category = category; 
      } 

      protected void done() { 
       completionQueue.add(task); 
      } 

      public int getCategory() { 
       return category; 
      } 
     } 
    } 

    public abstract class CategoryRunnable implements Runnable { 

     private int category; 

     public CategoryRunnable(int category) { 
      this.category = category; 
     } 

     public int getCategory() { 
      return category; 
     } 

     void go() throws Exception { 
      // To be implemented. Do nothing by default. 
      logger.warn("Implement go method !"); 
     } 

     @Override 
     public void run() { 
      try { 
       go(); 
      } catch (Exception e) { 
       throw new RuntimeException(e); 
      } 
     } 
    } 

    public abstract class CategoryCallable<V> implements Callable<V> { 

     private int category; 

     public CategoryCallable(int category) { 
      this.category = category; 
     } 

     public int getCategory() { 
      return category; 
     } 
    } 

    public class CategoryBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { 

     /** 
     * Lock held by take, poll, etc 
     */ 
     private final ReentrantLock takeLock = new ReentrantLock(); 

     /** 
     * Wait queue for waiting takes 
     */ 
     private final Condition notEmpty = takeLock.newCondition(); 

     /** 
     * Lock held by put, offer, etc 
     */ 
     private final ReentrantLock putLock = new ReentrantLock(); 

     private Map<Integer, LinkedBlockingQueue<E>> map = new ConcurrentHashMap<>(); 

     private AtomicInteger count = new AtomicInteger(0); 

     private LinkedBlockingQueue<Integer> nextCategories = new LinkedBlockingQueue<>(); 

     @Override 
     public boolean offer(E e) { 
      CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) e; 
      putLock.lock(); 
      try { 
       int category = item.getCategory(); 
       if (!map.containsKey(category)) { 
        map.put(category, new LinkedBlockingQueue<E>()); 
        if (!nextCategories.offer(category)) return false; 
       } 
       if (!map.get(category).offer(e)) return false; 
       int c = count.getAndIncrement(); 
       if (c == 0) signalNotEmpty();// if we passed from 0 element (empty queue) to 1 element, signal potentially waiting threads on take 
       return true; 
      } finally { 
       putLock.unlock(); 
      } 
     } 

     private void signalNotEmpty() { 
      takeLock.lock(); 
      try { 
       notEmpty.signal(); 
      } finally { 
       takeLock.unlock(); 
      } 
     } 

     @Override 
     public E take() throws InterruptedException { 
      takeLock.lockInterruptibly(); 
      try { 
       while (count.get() == 0) { 
        notEmpty.await(); 
       } 
       E e = dequeue(); 
       int c = count.decrementAndGet(); 
       if (c > 0) notEmpty.signal(); 
       return e; 
      } finally { 
       takeLock.unlock(); 
      } 
     } 

     private E dequeue() throws InterruptedException { 
      Integer nextCategory = nextCategories.take(); 
      LinkedBlockingQueue<E> categoryElements = map.get(nextCategory); 
      E e = categoryElements.take(); 
      if (categoryElements.isEmpty()) { 
       map.remove(nextCategory); 
      } else { 
       nextCategories.offer(nextCategory); 
      } 
      return e; 
     } 

     @Override 
     public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
      E x = null; 
      long nanos = unit.toNanos(timeout); 
      takeLock.lockInterruptibly(); 
      try { 
       while (count.get() == 0) { 
        if (nanos <= 0) return null; 
        nanos = notEmpty.awaitNanos(nanos); 
       } 
       x = dequeue(); 
       int c = count.decrementAndGet(); 
       if (c > 0) notEmpty.signal(); 
      } finally { 
       takeLock.unlock(); 
      } 
      return x; 
     } 

     @Override 
     public boolean remove(Object o) { 
      if (o == null) return false; 
      CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) o; 
      putLock.lock(); 
      takeLock.lock(); 
      try { 
       int category = item.getCategory(); 
       LinkedBlockingQueue<E> categoryElements = map.get(category); 
       boolean b = categoryElements.remove(item); 
       if (categoryElements.isEmpty()) { 
        map.remove(category); 
       } 
       if (b) { 
        count.decrementAndGet(); 
       } 
       return b; 
      } finally { 
       takeLock.unlock(); 
       putLock.unlock(); 
      } 
     } 

     @Override 
     public E poll() { 
      return null; 
     } 

     @Override 
     public E peek() { 
      return null; 
     } 

     @Override 
     public void put(E e) throws InterruptedException { 

     } 

     @Override 
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 
      return false; 
     } 

     @Override 
     public int drainTo(Collection<? super E> c) { 
      return 0; 
     } 

     @Override 
     public int drainTo(Collection<? super E> c, int maxElements) { 
      return 0; 
     } 

     @Override 
     public Iterator<E> iterator() { 
      return null; 
     } 

     @Override 
     public int size() { 
      return count.get(); 
     } 

     @Override 
     public int remainingCapacity() { 
      return 0; 
     } 

    } 

} 
0

保持簡單。

  1. 有兩個專用線程池用於較小和較長的任務。
  2. 最好使用Executors.html#newFixedThreadPoolExecutors.html#newWorkStealingPool

工作竊取線程池有效地利用可用的CPU內核。

看一看下面的相關SE問題的更多細節:

Java: How to scale threads according to cpu cores?

+0

感謝您的幫助。你的建議也可以。但是,我看到2個小缺點。在應用程序不太繁忙的情況下,大請求將總是利用比單個池可用的線程數少的線程。其次,使用2池方法會產生新的問題:如何對請求進行分類是大還是小?什麼是極限?這就是爲什麼我更願意繼續使用最初的方法來讓線程按順序發送並行請求。 – Comencau

+0

這是特定於應用程序和開發人員。如果他使用全局隊列處理所有類型的消息,則可以根據類名添加到兩個不同的執行程序中。在這種情況下,開發人員知道一個特定的類需要更長的時間來執行。 –