2011-05-07 57 views
1

我嘗試parallelizemerge sort實現:http://pastebin.com/2uMGjTxr。 我想創建儘可能多的Java-VM可以提供的線程。我想確定使用java.lang.Runtime的最大可能線程數。出並行合併時內存不足的錯誤排序

於是我想出了一個名爲MergeThread類:

public class MergeThread implements Runnable{ 

    public int[] list; 
    int sIndex, eIndex; 

    public MergeThread(int[] pArray, int pStartIndex, int pEndIndex){ 
     list = pArray; 
     sIndex = pStartIndex; 
     eIndex = pEndIndex; 
    } 

    public void run(){ 
     list = mergeSort(list, sIndex, eIndex); 
    } 

    /** 
    * Merges two sorted int array into one new sorted array. 
    * @param lhs 
    * @param rhs 
    * @return 
    */ 
    private static int[] merge(int[] lhs, int[] rhs) { 
     int[] result = new int[lhs.length + rhs.length]; 

     int leftIndex = 0; 
     int rightIndex = 0; 
     while(leftIndex < lhs.length && rightIndex < rhs.length) { 
      if(lhs[leftIndex] <= rhs[rightIndex]) { 
       result[leftIndex + rightIndex] = lhs[leftIndex]; 
       leftIndex++; 
      } else { 
       result[leftIndex + rightIndex] = rhs[rightIndex]; 
       rightIndex++; 
      } 
     } 

     while(leftIndex < lhs.length) { 
      result[leftIndex + rightIndex] = lhs[leftIndex]; 
      leftIndex++; 
     } 

     while(rightIndex < rhs.length) { 
      result[leftIndex + rightIndex] = rhs[rightIndex]; 
      rightIndex++; 
     } 

     return result; 
    } 

    /** 
    * Sorts an array from index <code>startIndex</code> (inclusive) to <code>endIndex</code> (exclusive). 
    * @param array 
    * @param startIndex 
    * @param endIndex 
    * @return new array that is sorted 
    */ 
    private static int[] mergeSort(int[] array, int startIndex, int endIndex) { 
     int length = endIndex - startIndex; 
     if(length == 0) { 
      return new int[]{}; 
     } 
     if(length == 1) { 
      return new int[]{array[startIndex]}; 
     } 

     int halfLength = length/2; 
     //int[] sortedLeftPart = mergeSort(array, startIndex, startIndex + halfLength); 
     MergeThread m1 = new MergeThread(array, startIndex, startIndex + halfLength); 
     Thread t1 = new Thread(m1); 
     t1.start(); 
     //int[] sortedRightPart = mergeSort(array, startIndex + halfLength, endIndex); 
     MergeThread m2 = new MergeThread(array, startIndex + halfLength, endIndex); 
     Thread t2 = new Thread(m2); 
     t2.start(); 
     try{ 
     t1.join(); 
     t2.join(); 
     }catch(InterruptedException e){} 
     return merge(m1.list, m2.list);  
    } 
} 

而實際上啓動過程

import java.util.Random; 

public class Aufg2 { 
    public static Random random = new Random(100); 

    public static void main(String[] args) { 
     int[] array = createRandomArray(10000000); 

     long time = System.currentTimeMillis(); 

     int[] sortedArray = sort(array); 

     if(sortedArray.length != array.length || !isSorted(sortedArray)) { 
      System.err.println("Failed to sort given array! :-("); 
      return; 
     }  
     System.out.println("Success! Sorting took " + (System.currentTimeMillis() - time) + "ms.");  
    } 

    /** 
    * Creates a randomly filled array of given length 
    * @param length 
    * @return 
    */ 
    private static int[] createRandomArray(int length) { 
     int[] result = new int[length]; 
     for(int i = 0; i < length; i++) { 
      result[i] = random.nextInt(); 
     } 
     return result; 
    } 

    /** 
    * Checks whether a given int array is sorted in ascending order 
    * @param array 
    * @return <code>true</code> if the given int array is sorted; <code>false</code> otherwise. 
    */ 
    private static boolean isSorted(int[] array) { 
     for(int i = 1; i < array.length; i++) { 
      if(array[i] < array[i-1]) { 
       return false; 
      } 
     } 
     return true; 
    } 

    /** 
    * Sorts a given array (ascending order) 
    * @param array 
    * @return 
    */ 
    private static int[] sort(int[] array){ 
     //TODO: use multiple threads to speed up the sorting 
     MergeThread m = new MergeThread(array, 0, array.length); 

     try{ 

     Thread t1 = new Thread(m); 
     t1.start(); 
     t1.join(); 
     }catch(InterruptedException e){ 

     } 
     return m.list; 
    } 
} 

類然而,這歸併排序不起作用。控制檯打印很多java.lang.OutOfMemmoryError's unable to create new native thread

後來則消息變爲類似java heap

我有什麼改變來排序得到的合併工作,我怎麼使用java.lang.Runtime中的是什麼?

+0

將* exact *堆棧跟蹤複製到您的發佈中,然後突出顯示與該跟蹤對應的代碼行。 – Anon 2011-05-07 19:40:06

+1

順便說一句,即使你已經正確地實現排序,如果你想並行的每一個分區,你會嘗試創建* O(NlogN)*線程。除了一個非常小的數組之外,哪個會失敗。線程是有限的資源。 – Anon 2011-05-07 19:44:05

回答

6

分而治之的機制有你想要創建類似5000000級的線程 - 每個那些需要看成堆棧內存的默認256KB(IIRC)的。仍然爲什麼你會得到一個OutOfMemmoryError

通過使用fixed size thread pool限制線程數量 - 對池中的線程數量進行一點測試,但是遠超過系統中的內核數量的任何內容都不可能提高性能(並且可能確實會降低它)。

+0

很明顯,<內核數量只會影響性能。但使用> nr_cores通常可以提高性能。根據我的經驗,一個好的猜測是1.5,但這是你必須嘗試的。太大的數字顯然也不會做好..雖然.. – Voo 2011-05-07 19:52:18

1

首先使用的ExecutorService和它排隊,而不是創造數以百萬計的線程的新任務(這應該擺脫的第一個問題,你用完了資源越早或更高版本,如果你創造數百萬個線程)。核心數量的1.5倍通常是一個很好的猜測(通常比使用可用數量的內核提供更好的結果 - 但這是你必須玩的東西)。

然後 - 如果您希望此算法具有任何性能,則絕​​對重要 - 在合理的閾值下對葉片情況使用QuickSort,如果您想要較低的閾值,則使用InsertionSort(如果使用插入排序葉節點大小16左右應該可以正常工作)。

0

讓一個線程做陣列下半年,同時調用線程處理上半年

int halfLength = length/2; 
    MergeThread m2 = new MergeThread(array, startIndex + halfLength, endIndex); 
    Thread t2 = new Thread(m2); 
    t2.start();//let new thread handle the second half 
    array = mergeSort(array, startIndex, startIndex + halfLength);//do first half ourselves 
    try{ 
    t2.join(); 
    }catch(InterruptedException e){} 
    return merge(array, m2.list); 

這減輕由你有

但快速排序就是半創造下來的線程的數量並行化好多了,因爲它不需要遞歸遞歸步驟,允許線程(可以在excecutors中運行的任務)在委派後立即返回

調用者然後只需要觀察所有作業何時完成