2016-06-21 53 views
2

我是Java中多線程的新手。Java如何實現並行性,動態分配工作?

我的目標是讓一個線程讀取文件,然後將工作塊傳遞給工作線程並行處理。

這裏有一個非常好的例子。 Concurrency Tutorial

此代碼片段獲取工作列表(ArrayList<String> URLs)並將其轉儲到具有Task.call()方法中指定的函數的工作線程塊上。

void pingAndReportEachWhenKnown() throws InterruptedException, ExecutionException { 
    int numThreads = URLs.size() > 4 ? 4 : URLs.size(); //max 4 threads 
    ExecutorService executor = Executors.newFixedThreadPool(numThreads); 
    CompletionService<PingResult> compService = new ExecutorCompletionService<>(executor); 
    for(String url : URLs){ 
    Task task = new Task(url); 
    compService.submit(task); 
    } 
    for(String url : URLs){ 
    Future<PingResult> future = compService.take(); 
    log(future.get()); 
    } 
    executor.shutdown(); //always reclaim resources 
} 

這正是我想要做的,但我需要改變。 我的工作隊列的大小不適合工作內存(巨大文件),所以我需要緩衝讀取線。我可以用ArrayBlockingQueue實現Blocking我需要的功能。但是,我還需要將任務分配緩衝到CompletionService。工作塊大小會有所不同,因此完成時間也會有所不同。

我怎麼不把太多的compService工作隊列?下面的代碼會一次放置一個項目,因爲它會在嘗試從隊列中獲取另一個任務之前等待完成。所以這是不夠的。處理這個問題的正確或最好的方法是什麼?

for(;;){ 
    Task task = arrayBlockingQueue.take(); //Blocking operation 
    compService.submit(task); 
    Future<PingResult> future = compService.take(); //Blocking operation 
    log(future.get()); 
} 

回答

2

代替撥打Executors.newFixedThreadPool(numThreads),您可以直接創建ThreadPoolExecutor。該類的構造函數之一允許您提供線程池將使用的隊列。

因此,爲它提供一個有界的隊列(例如,具有固定容量的ArrayBlockingQueue):當隊列滿時,您的製作者線程將被阻止,並且它將停止讀取文件,直到某些工作已經完成完成。


約翰的Vint說,

,遺憾將無法正常工作。一旦隊列已滿並且所有線程繁忙時都會拋出RejectedExecutionException異常。

如果你使用這個構造函數呢?

ExecutorService executorService = new ThreadPoolExecutor(
    CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit..., 
    new ArrayBlockingQueue<>(TASK_BACKLOG_LIMIT), 
    new ThreadPoolExecutor.CallerRunsPolicy() 
    ); 

約當任務拒絕會發生什麼ThreadPoolExecutor會談的Javadoc(例如,因爲隊列已滿)。它說,默認的行爲是拋出RejectedExecutionException,但...

...提供四種預定義的處理程序策略(2)在ThreadPoolExecutor.CallerRunsPolicy,調用execute本身線程運行的任務。這提供了一個簡單的反饋控制機制,將減慢提交新任務的速度...

+0

這不幸的是不會工作。一旦隊列已滿並且所有線程忙於拋出「RejectedExecutionException」。 –

+0

@JohnVint,看起來像是一種無需自己構建就能節流的方法。看到我更新的答案。 –

+0

我喜歡它,即使隊列飽和,你仍然可以取得進步。 –

1

我做了一些類似的之前,你可以使用一個BufferedReader和後線的讀設定的閾值發送一個StringBuffer來的線程進行處理。

另一種選擇是將文件拆分爲多個較小的文件。每個文件一旦創建就發送給一個線程。

以下是將大文件拆分爲多個小文件進行處理的示例。 Splitter是一個類,它只需從較大的文件中抓取一定數量的行以放入較小的文件中。

private void execute() { 

    File segFile; 
    Splitter split = new Splitter(maxLines, file); 

    while ((segFile = split.nextSegment()) != null) { 

     String seg = segFile.getName().substring(segFile.getName().lastIndexOf("_")+1); 

     Runnable thread; 

     if (workflow.equals("Account")) { 
      thread = new AccountThread(segFile); 
     } 
     else { 
      thread = new CustomerThread(segFile); 
     } 

     pool.execute(thread); 
    } 
    pool.shutdown(); 

    while (!pool.isTerminated()) {} 

} 
0

我認爲你真正想要的是一個信號量。您可以獲取資源並確保任務在完成時釋放它。這應該會給你想要的節流。

如果將此與CompletableFuture一起使用,則應該有更簡潔的代碼。

Semaphore semaphore = new Semaphore(NUMBER_OF_QUEUED_TASKS); 
ExecutorService executor = Executors.newFixedThreadPool(numThreads); 

for (String url : URLs) { 
    semaphore.acquire(); // if there have been too many requests  
         // queued wait until one is released 

    CompletableFuture 
     .supplyAsync(new Task(url), e) 
     .thenAccept(this::log) 
     .thenAccept((t) -> semaphore.relase(1)); 
} 
e.shutdown(); 
... 
public static class Task implements Supplier<PingResult> { 
    @Override 
    public PingResult get() { 
    } 
} 
+0

我看到NUMBER_OF_QUEUED_TASKS - 這是否意味着在開始閱讀文件之前我需要知道文件中有多少個塊?編輯:我看着Javadocs - 我認爲這是限制在同時運行的任務數量?我希望這是否等於numThreads? – BAMF4bacon

+0

根據我對您問題的理解,您希望能夠排列一些任務,但不是所有任務,因爲您的內存可能會用完。 'NUMBER_OF_QUEUED_TASKS'是可以填充隊列的任務數+線程數。如果你不想讓它只是線程的數量,那麼你可以將值改爲numThreads –

1

據我所知,沒有令人信服的理由來區分工作生產者和工作者線程。您可以簡單地使讀操作線程安全,而不是使用ArrayBlockingQueue。

public static class SynchronizedBufferedReader extends BufferedReader { 
    public SynchronizedBufferedReader(final Reader in) { 
     super(in); 
    } 

    @Override 
    public synchronized String readLine() throws IOException { 
     return super.readLine(); 
    } 
} 

而不是產生每一個結果的任務,每個任務可以採取同樣的閱讀器,並做一個for循環工作,直到readLine返回null。這樣,您可以創建與線程一樣多的任務,並且所有任務都將保持忙碌狀態。

+1

這是一個非常引人注目的方法。我認爲一個細節可能並不清楚 - 一個塊可能長達40萬行,並且不能分開。不過,我認爲這個想法是有效的。我只需寫一個readChunk()方法,而不用readLine()方法。 – BAMF4bacon