2016-08-15 201 views
1

對於我的用例,我需要一個可以根據優先級執行任務的執行程序。實現此目的的簡單方法是使用帶有PriorityBlockingQueue的線程池並重寫newTaskFor()以根據任務的優先級返回可比較的自定義未來任務。具有優先任務並避免飢餓的線程池執行程序

//Define priorities 
public enum Priority { 
    HIGH, MEDIUM, LOW, VERYLOW; 
} 

一項優先任務

//A Callable tasks that has priority. Concrete implementation will implement 
//call() to do actual work and getPriority() to return priority 
public abstract class PriorityTask<V> implements Callable<V> { 
    public abstract Priority getPriority(); 
} 

實際執行實施

public class PriorityTaskThreadPoolExecutor <V> { 
    int _poolSize; 
    private PriorityBlockingQueue<Runnable> _poolQueue = 
             new PriorityBlockingQueue<Runnable>(500); 
    private ThreadPoolExecutor _pool; 

    public PriorityTaskThreadPoolExecutor (int poolSize) { 
     _poolSize = poolSize; 

     _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 5, TimeUnit.MINUTES, 
             _poolQueue) { 
         //Override newTaskFor() to return wrap PriorityTask 
         //with a PriorityFutureTaskWrapper. 
         @Override 
         protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 
          return new PriorityFutureTaskWrapper<V>((PriorityTask<V>) c); 
         } 
       }; 

     _pool.allowCoreThreadTimeOut(true); 
    } 

    public Future<V> submit (PriorityTask<V> task) { 
     return _pool.submit(task); 
    } 

} 

//A future task that wraps around the priority task to be used in the queue 
class PriorityFutureTaskWrapper<V> extends FutureTask<V> 
          implements Comparable <PriorityFutureTaskWrapper<V>> { 
    PriorityTask<V> _priorityTask; 

    public PriorityFutureTaskWrapper (PriorityTask<V> priorityTask) { 
     super(priorityTask); 
     _priorityTask = priorityTask; 
    } 

    public PriorityTask<V> getPriorityTask() { 
     return _priorityTask; 
    } 

    @Override 
    public int compareTo(PriorityFutureTaskWrapper<V> o) { 
     return _priorityTask.getPriority().ordinal() - 
       o.getPriorityTask().getPriority().ordinal(); 
    } 
} 

問題的,這是在我的用例,有一個潛在的低優先級的任務可能會永遠捱餓。我想避免這種情況。我無法找到一個乾淨的方式來使用java中提供的執行程序/池來完成此操作。所以我想寫我自己的執行者。我有兩種不同的方法。

1)具有PriorityBlockingQueue的自定義線程池。將會有一個單獨的線程,檢查隊列中的任務年齡。較舊的任務將被刪除並重新添加升級優先級。

2)我的用例只有有限數量的優先級,比如1-4。每個優先級我將有4個不同的隊列。現在,自定義池中的線程(而不是隊列上的阻塞)將在必須接受下一個任務時按以下順序掃描隊列。

40%線程 - Q1,Q2,Q3,Q4

30%線程 - Q2,Q1,Q3,Q4

20%線程 - Q3,Q1,Q2,Q4

10 %線程 - Q4,Q1,Q2,Q3

掃描將由線程完成,當它被通知有關隊列中的新增加或該線程執行的當前任務完成時。其他時候,線程將會等待。但是,與阻塞隊列相比,掃描效率會更低。

Apprach 2更適合我的用例。

有沒有人嘗試過任何這些方法或類似用例的不同方法?任何想法/建議?

+1

難道你不能只是改變你現有的'PriorityTask'實現添加升級? OR:在你的「compareTo」中添加一種方法來考慮任務的年齡。所以你的「有效」優先權不僅是優先領域的價值,也是任務的時代。 – Fildor

+0

爲了升級,PriorityTask必須獲得CPU週期......所以線程或單獨的線程之一必須檢查並升級它 - 定期或基於某個觸發器。也可以將其從隊列中移除並在升級後重新添加,以便將其插入到基於新優先級的位置。這是我在帖子中提到的第一種方法。但我不太相信這種方法...... – Rajesh

+0

我曾經使用的另一種方法是我們實施了「計劃外流」。也就是說:每天一次低負載概率很高時,我們將整個隊列抽取到另一個隊列中,這個隊列在單獨的SingleThreadPoolExecutor上處理。 – Fildor

回答

1

有沒有簡單的方法時Queue改變已插入的元素的優先級,它已經討論here

你的第二個選項應該是很容易實現,像比低高優先級隊列處理更多的任務優先級隊列

您還可以考慮爲每個優先級和每個池中的線程數取決於任務的優先級,以便擁有不同的ThreadPool。

相關問題