對於我的用例,我需要一個可以根據優先級執行任務的執行程序。實現此目的的簡單方法是使用帶有PriorityBlockingQueue的線程池並重寫newTaskFor()以根據任務的優先級返回可比較的自定義未來任務。具有優先任務並避免飢餓的線程池執行程序
//Define priorities
public enum Priority {
HIGH, MEDIUM, LOW, VERYLOW;
}
一項優先任務
//A Callable tasks that has priority. Concrete implementation will implement
//call() to do actual work and getPriority() to return priority
public abstract class PriorityTask<V> implements Callable<V> {
public abstract Priority getPriority();
}
實際執行實施
public class PriorityTaskThreadPoolExecutor <V> {
int _poolSize;
private PriorityBlockingQueue<Runnable> _poolQueue =
new PriorityBlockingQueue<Runnable>(500);
private ThreadPoolExecutor _pool;
public PriorityTaskThreadPoolExecutor (int poolSize) {
_poolSize = poolSize;
_pool = new ThreadPoolExecutor(_poolSize, _poolSize, 5, TimeUnit.MINUTES,
_poolQueue) {
//Override newTaskFor() to return wrap PriorityTask
//with a PriorityFutureTaskWrapper.
@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
return new PriorityFutureTaskWrapper<V>((PriorityTask<V>) c);
}
};
_pool.allowCoreThreadTimeOut(true);
}
public Future<V> submit (PriorityTask<V> task) {
return _pool.submit(task);
}
}
//A future task that wraps around the priority task to be used in the queue
class PriorityFutureTaskWrapper<V> extends FutureTask<V>
implements Comparable <PriorityFutureTaskWrapper<V>> {
PriorityTask<V> _priorityTask;
public PriorityFutureTaskWrapper (PriorityTask<V> priorityTask) {
super(priorityTask);
_priorityTask = priorityTask;
}
public PriorityTask<V> getPriorityTask() {
return _priorityTask;
}
@Override
public int compareTo(PriorityFutureTaskWrapper<V> o) {
return _priorityTask.getPriority().ordinal() -
o.getPriorityTask().getPriority().ordinal();
}
}
問題的,這是在我的用例,有一個潛在的低優先級的任務可能會永遠捱餓。我想避免這種情況。我無法找到一個乾淨的方式來使用java中提供的執行程序/池來完成此操作。所以我想寫我自己的執行者。我有兩種不同的方法。
1)具有PriorityBlockingQueue的自定義線程池。將會有一個單獨的線程,檢查隊列中的任務年齡。較舊的任務將被刪除並重新添加升級優先級。
2)我的用例只有有限數量的優先級,比如1-4。每個優先級我將有4個不同的隊列。現在,自定義池中的線程(而不是隊列上的阻塞)將在必須接受下一個任務時按以下順序掃描隊列。
40%線程 - Q1,Q2,Q3,Q4
30%線程 - Q2,Q1,Q3,Q4
20%線程 - Q3,Q1,Q2,Q4
10 %線程 - Q4,Q1,Q2,Q3
掃描將由線程完成,當它被通知有關隊列中的新增加或該線程執行的當前任務完成時。其他時候,線程將會等待。但是,與阻塞隊列相比,掃描效率會更低。
Apprach 2更適合我的用例。
有沒有人嘗試過任何這些方法或類似用例的不同方法?任何想法/建議?
難道你不能只是改變你現有的'PriorityTask'實現添加升級? OR:在你的「compareTo」中添加一種方法來考慮任務的年齡。所以你的「有效」優先權不僅是優先領域的價值,也是任務的時代。 – Fildor
爲了升級,PriorityTask必須獲得CPU週期......所以線程或單獨的線程之一必須檢查並升級它 - 定期或基於某個觸發器。也可以將其從隊列中移除並在升級後重新添加,以便將其插入到基於新優先級的位置。這是我在帖子中提到的第一種方法。但我不太相信這種方法...... – Rajesh
我曾經使用的另一種方法是我們實施了「計劃外流」。也就是說:每天一次低負載概率很高時,我們將整個隊列抽取到另一個隊列中,這個隊列在單獨的SingleThreadPoolExecutor上處理。 – Fildor