2010-06-06 75 views
6

我有一個算法,將通過一個大型的數據集讀取一些文本文件,並在這些行中搜索特定的術語。我已經用Java實現了它,但我不想發佈代碼,以便它看起來不在我正在尋找某人爲我實現它,但這確實是我真的需要很多幫助!這不是我的項目計劃,但數據集是巨大的,所以老師告訴我,我必須這樣做。需要幫助實現這個算法與地圖Hadoop MapReduce

編輯(我沒有澄清我previos版本)的數據集我是Hadoop集羣上,我應該做出的MapReduce實現

我讀到關於MapReduce和thaught,我第一次做標準的實現,然後用mapreduce做起來會更容易/不那麼容易。但沒有發生,因爲算法是相當愚蠢的,沒有什麼特別的,地圖減少...我不能籠絡它。

所以這裏不久僞我的算法的代碼

LIST termList (there is method that creates this list from lucene index) 
FOLDER topFolder 

INPUT topFolder 
IF it is folder and not empty 
    list files (there are 30 sub folders inside) 
    FOR EACH sub folder 
     GET file "CheckedFile.txt" 
     analyze(CheckedFile) 
    ENDFOR 
END IF 


Method ANALYZE(CheckedFile) 

read CheckedFile 
WHILE CheckedFile has next line 
    GET line 
    FOR(loops through termList) 
      GET third word from line 
      IF third word = term from list 
     append whole line to string buffer 
    ENDIF 
ENDFOR 
END WHILE 
OUTPUT string buffer to file 

而且,正如你所看到的,每次當「分析」之稱,新文件被創建時,我明白了地圖減輕困難寫入許多輸出?

我明白mapreduce的直覺,我的例子似乎完全適合mapreduce,但是當涉及到這樣做時,顯然我不知道足夠多,而且我很餓!

請幫忙。

回答

3

您可以使用空的縮減器,並對作業進行分區,以便爲每個文件運行一個映射器。每個映射器將在您的輸出文件夾中創建它自己的輸出文件。

+0

嗨! Thanx的答案!但我不確定我明白:/你能給我更多的信息嗎?你可能有這樣的例子嗎? – Julia 2010-06-08 15:30:26

2

Map Reduce使用一些很好的Java 6併發功能,特別是Future,Callable和ExecutorService,很容易實現。

我創建了一個可贖回,將在路分析文件指定

public class FileAnalyser implements Callable<String> { 

    private Scanner scanner; 
    private List<String> termList; 

    public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException { 
    this.termList = termList; 
    scanner = new Scanner(new File(filename)); 
    } 

    @Override 
    public String call() throws Exception { 
    StringBuilder buffer = new StringBuilder(); 
    while (scanner.hasNextLine()) { 
     String line = scanner.nextLine(); 
     String[] tokens = line.split(" "); 
     if ((tokens.length >= 3) && (inTermList(tokens[2]))) 
     buffer.append(line); 
    } 
    return buffer.toString(); 
    } 

    private boolean inTermList(String term) { 
    return termList.contains(term); 
    } 
} 

我們需要創建一個新的可調用找到的每個文件,並提交該執行人的服務。提交的結果是一個Future,我們稍後可以使用它來獲取文件解析的結果。

public class Analayser { 

    private static final int THREAD_COUNT = 10; 

    public static void main(String[] args) { 

    //All callables will be submitted to this executor service 
    //Play around with THREAD_COUNT for optimum performance 
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); 

    //Store all futures in this list so we can refer to them easily 
    List<Future<String>> futureList = new ArrayList<Future<String>>(); 

    //Some random term list, I don't know what you're using. 
    List<String> termList = new ArrayList<String>(); 
    termList.add("terma"); 
    termList.add("termb"); 

    //For each file you find, create a new FileAnalyser callable and submit 
    //this to the executor service. Add the future to the list 
    //so we can check back on the result later 
    for each filename in all files { 
     try { 
     Callable<String> worker = new FileAnalyser(filename, termList); 
     Future<String> future = executor.submit(worker); 
     futureList.add(future); 
     } 
     catch (FileNotFoundException fnfe) { 
     //If the file doesn't exist at this point we can probably ignore, 
     //but I'll leave that for you to decide. 
     System.err.println("Unable to create future for " + filename); 
     fnfe.printStackTrace(System.err); 
     } 
    } 

    //You may want to wait at this point, until all threads have finished 
    //You could maybe loop through each future until allDone() holds true 
    //for each of them. 

    //Loop over all finished futures and do something with the result 
    //from each 
    for (Future<String> current : futureList) { 
     String result = current.get(); 
     //Do something with the result from this future 
    } 
    } 
} 

我的例子很不完整,遠沒有效率。我還沒有考慮樣本的大小,如果它真的很大,你可以不斷循環在futureList,除去已完成的元素,類似於:

while (futureList.size() > 0) { 
     for (Future<String> current : futureList) { 
     if (current.isDone()) { 
      String result = current.get(); 
      //Do something with result 
      futureList.remove(current); 
      break; //We have modified the list during iteration, best break out of for-loop 
     } 
     } 
} 

或者你可以實現一個生產者 - 消費者類型設置,其中的製片人向執行者服務器提交可調用卡併產生未來,消費者將獲得未來的結果,然後丟棄未來。

這可能需要生產者和消費者自行創建線索,並且需要同步添加/刪除期貨列表。

有任何問題,請提出。

+0

嗨!非常感謝您提出的解決方案!我很抱歉,我可能沒有明確指出問題,儘管我嘗試了。我的錯誤是,我剛剛在標題中提到了Hadoop,但我的數據集位於運行hadoop的羣集上,所以我應該根據Hadoop MaPreduce框架來實現它...現在編輯我的帖子。我分析的數據集是6GB :/太多的併發來應付它? – Julia 2010-06-07 17:10:20

+0

哎呀,我是一個小菜在這裏:D 爲了讓我自己稍微兌現,我在100個文件上運行我的代碼,每個約61MB,總共約6GB。我不完全確定你的文件解析器是幹什麼的,所以把這些血腥的細節排除在外,只是掃描每一行並返回一個空字符串。我知道有點做作。 性能不是太糟糕,線程池大小爲100,因此所有100個文件都被解析,而不會被執行程序服務排隊。我的Atom處理器的總運行時間爲17分鐘。 對不起,我無法正確回答你的問題。我沒有使用Hadoop的經驗,但在閱讀SquareCog的答案之後纔有意義。 – 2010-06-08 05:57:10

+0

嗨!非常感謝你,你幫了很多忙,因爲我無法應付hadoop先生和大腦的時間。我將有幾個更類似的算法來實現,所以我必須以我能夠做到的方式來嘗試它。無法在任何地方獲得hadoop幫助:/ 因此,我的代碼已採用,並且在我的英特爾2Ghz上帶有線程池42花了大約20分鐘時間解析並將結果輸出到新文件中,但只有200Mb數據(42個文件)。再次,我必須對解析器做一些修改,它必須做一些更嚴格的匹配,而不是純粹的「包含」術語,所以當我運行它時,我讓你知道結果:) – Julia 2010-06-09 21:15:54