我開發了一段多線程的代碼。此代碼在Web應用程序中調用,因此可能由多個線程(請求)並行執行。爲了保持這個代碼將要創建的線程數量(通過被幾個並行請求調用),我使用一個靜態共享的ThreadPoolExecutor(Executors.newFixedThreadPool(nbOfThreads)
)。所以我確信這段代碼永遠不會創建超過nbOfThreads
的線程。要跟蹤給定請求中涉及的任務並等待它們完成,我會爲每個請求使用一個CompletionService。Java - 平衡ThreadPoolExecutor公平地給線程並行請求
現在,我想通過池中的線程向請求發送請求的方式來獲得一些「公平性」(不確定是否是好詞)。 使用默認的固定ThreadPoolExecutor,等待隊列是LinkedBlockingQueue
。它根據它們的到達順序(FIFO)向執行器提供任務。想象一下,池核心大小是100個線程。第一個請求很大,涉及創建150個任務。所以它會使池滿,並將50個任務放入等待隊列中。如果第二個微小請求在1秒後到達,即使它只需要池中的2個線程,它也必須等待第一個大請求創建的所有150個任務在處理之前完成。
如何使池公平均勻地爲每個請求提供線程?如何使第二個請求的2個任務在第一個查詢的所有50個等待任務之後不等待?
我的想法是開發一個BlockingQueue的個人實現給予ThreadPoolExecutor。這個BlockingQueue將存儲由創建它們的請求分類的等待任務(在具有key中的請求的id的背景Map中,以及LinkedBlockingQueue將請求的任務存儲在值中)。然後當ThreadPoolExecutor take
或poll
一個新的任務從隊列中排隊時,隊列會每次從一個不同的請求中提交一個任務......這是否是正確的方法?用例對我來說似乎很常見。我很驚訝我必須自己實施這種習慣和乏味的東西。這就是爲什麼我認爲我可能是錯的,並且存在這樣做的最佳實踐。
這是我做的代碼。它的工作原理,但仍然想知道這是否是正確的方法。
public class TestThreadPoolExecutorWithTurningQueue {
private final static Logger logger = LogManager.getLogger();
private static ThreadPoolExecutor executorService;
int nbRequest = 4;
int nbThreadPerRequest = 8;
int threadPoolSize = 5;
private void init() {
executorService = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, TimeUnit.MILLISECONDS,
new CategoryBlockingQueue<Runnable>()// my custom blocking queue storing waiting tasks per request
//new LinkedBlockingQueue<Runnable>()
);
}
@Test
public void test() throws Exception {
init();
// Parallel requests arriving
ExecutorService tomcat = Executors.newFixedThreadPool(nbRequest);
for (int i = 0; i < nbRequest; i++) {
Thread.sleep(10);
final int finalI = i;
tomcat.execute(new Runnable() {
@Override
public void run() {
request(finalI);
}
});
}
tomcat.shutdown();
tomcat.awaitTermination(1, TimeUnit.DAYS);
}
// Code executed by each request
// Code multi-threaded using a single shared ThreadPoolExecutor to keep the
// number of threads under control
public void request(final int requestId) {
final List<Future<Object>> futures = new ArrayList<>();
CustomCompletionService<Object> completionService = new CustomCompletionService<>(executorService);
for (int j = 0; j < nbThreadPerRequest; j++) {
final int finalJ = j;
futures.add(completionService.submit(new CategoryRunnable(requestId) {
@Override
public void run() {
logger.debug("thread " + finalJ + " of request " + requestId);
try {
// here should come the useful things to be done
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, null));
}
// Wait fot completion of all the tasks of the request
// If a task threw an exception, cancel the other tasks of the request
for (int j = 0; j < nbThreadPerRequest; j++) {
try {
completionService.take().get();
} catch (Exception e) {
// Cancel the remaining tasks
for (Future<Object> future : futures) {
future.cancel(true);
}
// Get the underlying exception
Exception toThrow = e;
if (e instanceof ExecutionException) {
ExecutionException ex = (ExecutionException) e;
toThrow = (Exception) ex.getCause();
}
throw new RuntimeException(toThrow);
}
}
}
public class CustomCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final BlockingQueue<Future<V>> completionQueue;
public CustomCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
return new FutureTask<V>(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
return new FutureTask<V>(task, result);
}
public Future<V> submit(CategoryCallable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
return f;
}
public Future<V> submit(CategoryRunnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
return f;
}
public Future<V> submit(CategoryRunnable task) {
return submit(task, null);
}
@Override
public Future<V> submit(Callable<V> task) {
throw new IllegalArgumentException("Must use a 'CategoryCallable'");
}
@Override
public Future<V> submit(Runnable task, V result) {
throw new IllegalArgumentException("Must use a 'CategoryRunnable'");
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
/**
* FutureTask extension to enqueue upon completion + Category
*/
public class CategorizedQueueingFuture extends FutureTask<Void> {
private final Future<V> task;
private int category;
CategorizedQueueingFuture(RunnableFuture<V> task, int category) {
super(task, null);
this.task = task;
this.category = category;
}
protected void done() {
completionQueue.add(task);
}
public int getCategory() {
return category;
}
}
}
public abstract class CategoryRunnable implements Runnable {
private int category;
public CategoryRunnable(int category) {
this.category = category;
}
public int getCategory() {
return category;
}
}
public abstract class CategoryCallable<V> implements Callable<V> {
private int category;
public CategoryCallable(int category) {
this.category = category;
}
public int getCategory() {
return category;
}
}
public class CategoryBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private Map<Integer, LinkedBlockingQueue<E>> map = new HashMap<>();
private AtomicInteger count = new AtomicInteger(0);
private ReentrantLock lock = new ReentrantLock();
private LinkedBlockingQueue<Integer> nextCategories = new LinkedBlockingQueue<>();
@Override
public boolean offer(E e) {
CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) e;
lock.lock();
try {
int category = item.getCategory();
if (!map.containsKey(category)) {
map.put(category, new LinkedBlockingQueue<E>());
nextCategories.offer(category);
}
boolean b = map.get(category).offer(e);
if (b) {
count.incrementAndGet();
}
return b;
} finally {
lock.unlock();
}
}
@Override
public E poll() {
return null;
}
@Override
public E peek() {
return null;
}
@Override
public void put(E e) throws InterruptedException {
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
Integer nextCategory = nextCategories.take();
LinkedBlockingQueue<E> categoryElements = map.get(nextCategory);
E e = categoryElements.take();
count.decrementAndGet();
if (categoryElements.isEmpty()) {
map.remove(nextCategory);
} else {
nextCategories.offer(nextCategory);
}
return e;
} finally {
lock.unlock();
}
}
@Override
public boolean remove(Object o) {
CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) o;
lock.lock();
try {
int category = item.getCategory();
LinkedBlockingQueue<E> categoryElements = map.get(category);
boolean b = categoryElements.remove(item);
if (categoryElements.isEmpty()) {
map.remove(category);
}
if (b) {
count.decrementAndGet();
}
return b;
} finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super E> c) {
return 0;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return 0;
}
@Override
public Iterator<E> iterator() {
return null;
}
@Override
public int size() {
return count.get();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// TODO
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
}
}
輸出與傳統的LinkedBlockingQueue
2017-01-09 14:56:13,061 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-2] DEBUG - thread 5 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-1] DEBUG - thread 6 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-4] DEBUG - thread 7 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-3] DEBUG - thread 0 of request 1
2017-01-09 14:56:15,063 [pool-2-thread-5] DEBUG - thread 1 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-2] DEBUG - thread 2 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-4] DEBUG - thread 3 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-3] DEBUG - thread 4 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-5] DEBUG - thread 6 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-1] DEBUG - thread 0 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-3] DEBUG - thread 1 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-5] DEBUG - thread 2 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-2] DEBUG - thread 3 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-4] DEBUG - thread 4 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-3] DEBUG - thread 5 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-2] DEBUG - thread 7 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-1] DEBUG - thread 0 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-4] DEBUG - thread 2 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-3] DEBUG - thread 1 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-2] DEBUG - thread 3 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-1] DEBUG - thread 4 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-5] DEBUG - thread 5 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-1] DEBUG - thread 6 of request 3
輸出,帶我自定義CategoryBlockingQueue
2017-01-09 14:54:54,765 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-1] DEBUG - thread 0 of request 1
2017-01-09 14:54:56,767 [pool-2-thread-4] DEBUG - thread 0 of request 3
2017-01-09 14:54:56,767 [pool-2-thread-3] DEBUG - thread 5 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-5] DEBUG - thread 0 of request 2
2017-01-09 14:54:56,767 [pool-2-thread-2] DEBUG - thread 6 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-1] DEBUG - thread 1 of request 1
2017-01-09 14:54:58,767 [pool-2-thread-5] DEBUG - thread 1 of request 2
2017-01-09 14:54:58,767 [pool-2-thread-2] DEBUG - thread 7 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-4] DEBUG - thread 1 of request 3
2017-01-09 14:54:58,767 [pool-2-thread-3] DEBUG - thread 2 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-1] DEBUG - thread 2 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-5] DEBUG - thread 2 of request 3
2017-01-09 14:55:00,767 [pool-2-thread-2] DEBUG - thread 3 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-4] DEBUG - thread 3 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-3] DEBUG - thread 3 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-5] DEBUG - thread 4 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-3] DEBUG - thread 4 of request 2
2017-01-09 14:55:02,767 [pool-2-thread-2] DEBUG - thread 4 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-4] DEBUG - thread 5 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-2] DEBUG - thread 5 of request 3
2017-01-09 14:55:04,767 [pool-2-thread-1] DEBUG - thread 6 of request 1
2017-01-09 14:55:04,767 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-3] DEBUG - thread 6 of request 3
2017-01-09 14:55:04,768 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:55:06,768 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:55:06,768 [pool-2-thread-1] DEBUG - thread 7 of request 2
是否需要並行執行一個請求的所有「任務」?有100個線程(這是很多)?或者是否足以讓每個請求都有一個更小但專用的池? – Fildor
@Fildor事實上,我的第一個實現使用每個請求較小的專用池(核心大小= 20)。但是,當我試圖通過並行啓動幾個大的請求來「壓力測試」時,它啓動了太多的線程。例如,10個併發請求讓應用程序創建200個線程(10個線程池的核心大小爲20)。這就是爲什麼我認爲有一個共享的ThreadPoolExecutor可以真正允許「控制」可能存在的最大線程,即使在高工作負載下也是如此。 – Comencau
您可以使用PriorityQueue:https://docs.oracle.com/javase/8/docs/api/java/util/PriorityQueue.html請參閱http://stackoverflow.com/questions/3198660/java-executors-how -can-i-set-task-priority –