2014-09-04 41 views
0

爲了簡化我的情況,我們假設我正在使用Java的Fork-Join框架實現二進制搜索。我的目標是在整數數組中找到一個特定的整數值(目標整數)。這可以通過將數組分成一半來完成,直到足夠小以執行串行搜索。算法的結果需要是一個布爾值,指示目標整數是否在數組中找到。向fork-join遞歸添加停止條件

在幻燈片28以後的Klaus Kreft's presentation中探索到類似的問題。但是,Kreft的目標是找到陣列中最大的數字,因此所有條目都必須進行掃描。在我的情況下,沒有必要掃描整個數組,因爲一旦找到目標整數,就可以停止搜索。

我的問題是,一旦我遇到目標整數,許多任務已經被插入到線程池中,並且我需要取消它們,因爲沒有繼續搜索的要點。我嘗試從RecursiveTask中調用getPool()。terminate(),但這並沒有什麼幫助,因爲許多任務已經排隊,我甚至注意到即使在關閉後調用新的隊列也是排隊的。

My目前的解決方案是使用一個靜態易失性布爾值,該值以'false'開始,並在任務開始時檢查其值。如果它仍然是'假',那麼任務開始工作,如果'真',任務立即返回。我實際上可以使用RecursiveAction。

所以我認爲這個解決方案應該可以工作,但是我想知道框架是否提供了一些處理這種情況的標準方式 - 即爲遞歸定義一個停止條件,從而取消所有排隊的任務。

請注意,如果我想要在找到目標整數(通過其中一個正在運行的任務)時立即停止所有正在運行的任務,則必須檢查這些任務中每行之後的布爾值,並且可能會影響性能,因爲值該布爾值不能被緩存(它被定義爲volatile)。

所以的確如此,我認爲需要一些標準的解決方案,並且可以通過清除隊列和插入正在運行的任務來提供。但我還沒有找到這樣的解決方案,我想知道是否有其他人知道它或有一個更好的主意。

謝謝您的時間, 阿薩夫

編輯:這裏是我的測試代碼:

package xxx; 

import java.util.Arrays; 
import java.util.Random; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ForkJoinPool; 
import java.util.concurrent.RecursiveAction; 

public class ForkJoinTest { 

    static final int ARRAY_SIZE = 1000; 
    static final int THRESHOLD = 10; 

    static final int MIN_VALUE = 0; 
    static final int MAX_VALUE = 100; 

    static Random rand = new Random(); 


    // a function for retrieving a random int in a specific range 
    public static int randInt(int min, int max) { 
     return rand.nextInt((max - min) + 1) + min; 
    } 

    static volatile boolean result = false; 
    static int[] array = new int[ARRAY_SIZE]; 
    static int target; 

    @SuppressWarnings("serial") 
    static class MyAction extends RecursiveAction { 

     int startIndex, endIndex; 

     public MyAction(int startIndex, int endIndex) { 
      this.startIndex = startIndex; 
      this.endIndex = endIndex; 
     } 

     // if the target integer was not found yet: we first check whether 
     // the entries to search are too few. In that case, we perform a 
     // sequential search and update the result if the target was found. 
     // Otherwise, we break the search into two parts and invoke the 
     // search in these two tasks. 
     @Override 
     protected void compute() { 
      if (!result) { 
       if (endIndex-startIndex<THRESHOLD) { 
        // 
        for (int i=startIndex ; i<endIndex ; i++) { 
         if (array[i]==target) { 
          result = true; 
         } 
        } 
       } else { 
        int middleIndex = (startIndex + endIndex)/2; 
        RecursiveAction action1 = new MyAction(startIndex, middleIndex); 
        RecursiveAction action2 = new MyAction(middleIndex+1, endIndex); 
        invokeAll(Arrays.asList(action1,action2)); 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     for (int i=0 ; i<ARRAY_SIZE ; i++) { 
      array[i] = randInt(MIN_VALUE, MAX_VALUE); 
     } 
     target = randInt(MIN_VALUE, MAX_VALUE); 
     ForkJoinPool pool = new ForkJoinPool(); 
     pool.invoke(new MyAction(0,ARRAY_SIZE)); 
     System.out.println(result); 
    } 

} 
+0

你能發表一些代碼嗎?您可以使用可以清除的特定隊列,也可以中斷正在運行的線程,但查看代碼更容易爲您提供適當的建議。 – 2014-09-04 12:36:49

+0

我維護一個開源的fork/join框架,它提供了一個並行的順序搜索來處理您對「find first」的需求。您可以按原樣使用它,也可以使用代碼作爲如何自行完成的示例。 sourceForge鏈接是:http://sourceforge.net/projects/tymeacdse/?source=navbar – edharned 2014-09-04 13:58:18

+0

謝謝@edharned,我會看看。它依賴於Java的fork/join框架嗎?你也使用volatile boolean/AtomicBoolean來停止搜索嗎? – Assaf 2014-09-04 14:25:36

回答

0

我想你可能會發明一種屏障,正確的解決方案。

你說你boolean stop標誌必須是volatile,因此將與解決方案的速度干擾 - 好,yes和no - 訪問volatile確實做緩存刷新,但是你有沒有考慮一個AtomicBoolean

我相信正確的解決方案是使用AtomicBoolean標誌來讓所有進程停止。您應該檢查是否有合理的細節,以便讓系統快速停止。

嘗試清除所有隊列並中斷所有線程將是一個錯誤 - 這會導致可怕的混亂。

static AtomicBoolean finished = new AtomicBoolean(); 
    .... 

     protected void compute() { 
      if (!finished.get()) { 
       if (endIndex - startIndex < THRESHOLD) { 
        // 
        for (int i = startIndex; i < endIndex && !finished.get(); i++) { 
         if (array[i] == target) { 
          finished.set(true); 
          System.out.print("Found at " + i); 
         } 
        } 
       } else { 
        ... 
       } 
      } 
     } 
+0

謝謝,所以你建議切換到AtomicBoolean並添加一個檢查它的值作爲循環的一部分。我應該把這種類型的檢查也添加到我的volatile代碼中。但是你能解釋爲什麼在這種情況下AtomicBoolean比volatile更可取嗎?在性能方面,我認爲它們幾乎是相同的,因爲它們都是無鎖的並且沒有被緩存。 – Assaf 2014-09-04 14:23:23

+0

@Assaf - 幾乎沒有什麼區別 - 訪問'volatile'時刷新所有緩存,而訪問'AtomicBoolean' *應該更不容易侵入 - 它並不總是更好,但不會更糟。在你的情況下,沒有什麼區別[易揮發布爾vs AtomicBoolean](http://stackoverflow.com/questions/3786825/volatile-boolean-vs-atomicboolean) - 我的觀點是,你應該*不*採取其他清理路線隊列和中斷線程。 – OldCurmudgeon 2014-09-04 14:40:27

0

我在上面留言評論瞭如何通過查看在許多內置函數中執行此操作的開源產品來實現此目的。讓我在這裏提一些細節。

如果您想取消正在開始或正在執行的任務,那麼每個任務都需要了解其他任務。當一個任務找到它想要的任務時,該任務需要通知其他每個任務停止。你不能用二元遞歸除法(RecursiveTask等)來做到這一點,因爲你遞歸地創建新任務,舊任務永遠不會知道新任務。我相信你可以通過一個參考到每個新任務的stop-me字段,但它會變得非常混亂,調試將是「有趣的」。

你可以用Java8 CountedCompleter()來做到這一點。爲了支持這個類,該框架被屠殺,所以框架應該完成的事情需要手動完成,但它可以工作。

每個任務都需要一個易失性布爾值和一個將其設置爲true的方法。每個任務都需要一個對所有其他任務的引用數組。預先創建所有任務,每個任務都有一個空的數組,用於引用其他任務。填入每個其他任務的引用數組。現在提交每個任務(請參閱此類的文檔fork()addPendingCount()等)

當一個任務找到了它想要的內容時,它使用其他任務的引用數組將它們的布爾值設置爲true。如果多線程存在競爭條件,則所有線程都設置爲「true」並不重要。您還需要處理tryComplete(),onCompletion()等。這個類非常混亂。它用於Java8流處理,這本身就是一個故事。

你不能做的就是在deques開始之前清除未完成的任務。您需要等到任務啓動並檢查布爾值爲true。如果執行時間很長,那麼您可能還需要定期檢查布爾值是否爲true。易失性讀取的開銷並不是那麼糟糕,實際上沒有其他辦法。

+0

再次感謝您的參考和長時間的解釋。我注意到Java 8有了一些新的發展,但是要了解什麼時候更喜歡什麼,有點難以理解。需要做更多的閱讀。有一件事我不明白在你的描述中是爲什麼使用一個標誌數組(這意味着所有任務必須提前創建)而不是一個單一的全局標誌(volatile或AtomicBoolean,如上面的@OldCurmudgeon建議)。數組和單個標誌解決方案將會使將要開始的任務無效,並且如果標誌被檢查,兩者都可以在中間「停止」任務。 – Assaf 2014-09-06 00:30:30

+0

@Assaf你做這件事的確切方式取決於你。如果你在一個對象中有一個volatile布爾值,那麼你需要一個指向該引用的指針:pointer.isTrue();這是更多的開銷,然後只是檢查自己的本地變量:if(stop-me)...當你只在任務開始時檢查,誰在乎。但是,如果您定期檢查,那麼開銷很重要。 – edharned 2014-09-06 13:58:14

+0

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountedCompleter.html上的搜索示例顯示了創建AtomicReference/Atomic 的示例,然後將此原子傳遞給子任務。子任務必須調用atomic.get(),如果結果不爲null,則不執行任何操作。我不明白的是爲什麼你需要CountedCompleter,因爲RecursiveAction會做同樣的事情。 – snaran 2017-07-27 20:59:20