2015-04-17 38 views
2

我有3種類型的任務:A, B, C定製多線程:限制了一些類型的任務的數目,以並行執行,而不限制其它類型的任務

而且我想在N個線程並行運行這些任務。讓我們假設,即任務列表如下:

A, B, C, B, C, A, B, C 

當然,我可以實現使用ExecutorService多線程執行,但問題是,我需要C最多一個任務同時被執行。類型爲C的其他任務必須按順序執行,但與任務A和/或B並行執行。

例如3線程執行主體可以是任何以下狀態:

A B C 
A A A 
A C B 
B B C 
B B B 
B C 
A C 
A B 
C 
... 

(它被允許在一個時間執行類型A或B的多個任務,但至多一個任務 C型必須一次)

是否有某種方式在Java中實現這一目標被執行?

更新

這是我有川方有了這個做這個 在這裏,我通過ExecutorService執行所有任務的正確方法雖然執行我會檢查是否有任何其他C任務是Running.If不是我會執行不然我會添加到隊列,將任何其他任務的順利完成出隊,也是我檢查任何C任務正在運行或不

public class Test { 

public void startExecution() { 
    Queue<String> runQ = new LinkedList<>(); 
    ThreadPool exec = (ThreadPool) Executors.newFixedThreadPool(RunSettings.getRunSettings().getThreadCount()); 
    while (!runQ.isEmpty() && !SystemDefaults.stopExecution.get()) { 
     String TaskName = runQ.remove(); 
     Task t = new Task(TaskName); 
     exec.execute(t, TaskName); 

    } 
    exec.shutdown(); 
    if (exec.awaitTermination(RunSettings.getRunSettings().getExecutionTimeOut(), TimeUnit.MINUTES)) { 
     System.out.println("[CONTROL: ALL TEST TASKS COMPLETED SUCCESSFULLY.]"); 
    } else { 
     System.out.println("[CONTROL: ALL THE TEST TASKS DID NOT COMPLETE SUCCESSFULLY IN STIPULATED TIME. FORCEFULLY FINALIZING.]"); 
     exec.shutdownNow(); 

    } 
} 
} 

線程池什麼我創建了

public class ThreadPool extends ThreadPoolExecutor { 

public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
} 

final String CTask = "TaskC"; 
Map<Runnable, String> TaskPool = new HashMap<>(); 
Queue<Runnable> TaskCList = new LinkedList<>(); 

@Override 
protected void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    if (TaskPool.containsKey(r)) { 
     TaskPool.remove(r); 
    } 
    if (!TaskPool.containsValue(CTask) && !TaskCList.isEmpty()) { 
     Runnable ieRun = TaskCList.remove(); 
     super.execute(ieRun); 
     TaskPool.put(ieRun, CTask); 
    } 
} 

public void execute(Runnable command, String TaskType) { 
    if (TaskPool.containsValue(TaskType) 
      && TaskType.equalsIgnoreCase(CTask)) { 
     System.out.println("Another Instance of " + CTask + " Running"); 
     TaskCList.add(command); 
    } else { 
     super.execute(command); 
     TaskPool.put(command, TaskType); 
    } 
} 

} 
+1

你怎麼有'AAA', AB和'BBB'在這裏沒有'C'? – TheLostMind

+0

@TheLostMind列表中也可以沒有C任務以及..在那種情況下,這是正確的事 – Madhan

+0

@SashaSalauyou如果我創建兩個單獨的例如說,我有50個總計任務23 A 23 B 4 C我想讓他們在3個線程,然後根據你的情況,A和B將總是運行在兩個線程中,並且分配給C的線程不能被重用。或者可以在執行後增加池大小 – Madhan

回答

1

感謝所有爲您support.Anyhow我想出了一個解決方案,它是工作的罰款截至目前

public class Test { 

public void startExecution() { 
Queue<String> runQ = new LinkedList<>(); 
ThreadPool threadPool = new ThreadPool(threadCount,timeOut); 
while (!runQ.isEmpty()) { 
    String TaskName = runQ.remove(); 
    Task t = new Task(TaskName); 
    threadPool.execute(t, TaskName); 

} 
if (threadPool.awaitTermination(timeOut, TimeUnit.MINUTES)) { 
    System.out.println("[CONTROL: ALL TEST TASKS COMPLETED SUCCESSFULLY.]"); 
} else { 
    System.out.println("[CONTROL: ALL THE TEST TASKS DID NOT COMPLETE SUCCESSFULLY IN STIPULATED TIME. FORCEFULLY FINALIZING.]"); 
    threadPool.shutdownNow(); 

} 
} 
} 

線程池實現

public class ThreadPool extends ThreadPoolExecutor { 

public ThreadPool(int threadCount, long keepAliveTime) { 
    super(threadCount, threadCount, keepAliveTime, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
} 

final String CTask = "TaskC"; 
Map<Runnable, String> TaskPool = new HashMap<>(); 
Queue<Runnable> TaskCList = new LinkedList<>(); 

@Override 
protected synchronized void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    System.out.println(TaskPool.get(r) + "Finished"); 
    if (TaskPool.containsKey(r)) { 
     TaskPool.remove(r); 
    } 
    if (TaskCList.isEmpty()) { 
     super.shutdown(); 
    } 
    if (!TaskPool.containsValue(CTask) && !TaskCList.isEmpty()) { 
     if (super.getActiveCount() < super.getCorePoolSize()) { 
      System.out.println("Trying to execute Other C Tasks"); 
      Runnable ieRun = TaskCList.remove(); 
      super.execute(ieRun); 
      TaskPool.put(ieRun, CTask); 
     } 
    } 
} 

public synchronized void execute(Runnable command, String TaskType) { 
    if (TaskPool.containsValue(TaskType) 
      && TaskType.equalsIgnoreCase(CTask)) { 
     System.out.println("Another Instance of TaskC Running"); 
     System.out.println("Added for future Execution"); 
     TaskCList.add(command); 
    } else { 
     System.out.println("Adding " + TaskType + " to execution"); 
     TaskPool.put(command, TaskType); 
     super.execute(command); 
    } 
} 
2

的最簡單的方法是創建2個執行人:一個單線程C型的任務,和另一個多線程爲另一類型的任務:

class ExecutorWrapper { 
    private ExecutorService forC = Executors.newSingleThreadExecutor(); 
    private ExecutorService forAnother = Executors.newFixedThreadPool(THREAD_NUMBER); 

    public void acceptTask(Runnable r) { 
     if (r instanceof TaskC) { 
      forC.execute(r); 
     } else { 
      forAnother.execute(r); 
     } 
    } 
} 

現在,類型C的任何任務將在forC執行者內部隊列中等待,直到完成此類型的另一個任務。

如果您不希望創建另一個執行者,你需要實現某種形式的併發控制,這是更爲複雜和困難的,因爲可能發生的競爭條件來調試。我可以提出解決方案草案,但沒有代碼:

  1. 創建一個標誌,表示如果另一個任務C已經在執行,和隊列,其中另一個任務C的溫度將等待
  2. 當C型的任務到達,檢查如果另一個任務C是執行,如果是,將其添加到隊列中提到
  3. 在任務C完成後,發送某種形式的通知任務C已經完成 - 以自提隊列採取下一個任務C,並將其發送進入執行者。如果隊列爲空,請清除該標誌以指示現在不執行任何C任務。這種通知可以通過將任務C打包到Callable並調用Future#get方法來實現,該方法將阻塞直到任務完成。
+0

我正在研究其他一些方法,已經發布了更新。你可以通過它看看它是否正確。同時我也會檢查你的建議 – Madhan

+0

我認爲除了一時之外它是可以的:你應該在那裏實現更好的同步。 G。使'execute()'和'afterExecute()'方法同步。否則,這些方法可能會出現競爭狀況。 –

+0

好的,我會盡力去做 – Madhan

-1

看起來你所需要做的就是製作任務C​​。這裏有一些測試代碼似乎證明了這一點 - 儘管沒有失敗並不總是意味着成功。

結果清楚地表明A和B的並行運行,但從來沒有C的運行。

static enum Task implements Callable<Void> { 

    A, 
    B, 
    C { 
       @Override 
       public synchronized Void call() throws Exception { 
        if (running.get(this).get() != 0) { 
         System.out.println("FAIL!"); 
        } 
        return super.call(); 
       } 

      }; 

    // How many of each are running. 
    static Map<Task, AtomicInteger> running = Stream.of(Task.values()) 
      .collect(
        Collectors.toMap(
          (t) -> t, 
          (t) -> new AtomicInteger(0), 
          (x, y) -> x, 
          () -> new EnumMap<Task, AtomicInteger>(Task.class))); 

    // List all running tasks. 
    private String runningList() { 
     StringBuilder s = new StringBuilder(); 
     running.entrySet().stream().forEach((r) -> { 
      if (r.getValue().get() != 0) { 
       s.append(r.getKey()).append("=").append(r.getValue()).append(","); 
      } 
     }); 
     return s.toString(); 
    } 

    static final Random random = new Random(); 

    @Override 
    public Void call() throws Exception { 
     System.out.println("Running " + name() + " with " + runningList()); 
     // Mark me running. 
     running.get(this).getAndIncrement(); 
     // Hang around for a bit. 
     Thread.sleep(random.nextInt(1000)); 
     // Mark me not running. 
     running.get(this).getAndDecrement(); 
     return null; 
    } 

} 

// The pool. 
static ExecutorService pool = Executors.newFixedThreadPool(5); 
// The tasks. 
static Task[] tasks = new Task[]{Task.A, Task.B, Task.C, Task.B, Task.C, Task.A, Task.B, Task.C,}; 

public void test() throws InterruptedException { 
    // Run 10 times. 
    for (int i = 0; i < 10; i++) { 
     pool.invokeAll(Arrays.asList(tasks)); 
    } 
    pool.shutdown(); 
    pool.awaitTermination(10, TimeUnit.SECONDS); 
} 
+1

如果A,C,C通過執行,該怎麼辦? A將被放入線程1並開始,首先C將被放入線程2並啓動,但另一個將被放入可用線程3的C將被阻塞,從而使線程3被鎖定並且不能被其他A或B任務訪問直到第一個C完成。 –

+1

@SashaSalauyou - 正確 - 線程將被阻塞並停放,直到解除阻塞。這是這個選項的缺點,儘管它比使用多個執行器簡單得多。我並不是說這是比你更好的解決方案 - 只是不同而已。 – OldCurmudgeon