2016-02-29 101 views
0

我有一個包含2億HTML文件的目錄(不要看着我,我沒有創建這個混亂,我只需要處理它)。我需要將該目錄中的每個HTML文件編入索引。我一直在閱讀關於完成工作的指南,而且我現在有一些事情要做。大約一個小時之後,我已經獲得了大約100k的索引,這意味着大概需要85天。如何用Solr Cell批量索引HTML文件?

我將這些文件編入索引到獨立的Solr服務器,該服務器運行在c4.8xlarge AWS EC2實例上。下面是free -m輸出與Solr的服務器上運行,而我寫的索引運行,以及:

   total  used  free  shared buffers  cached 
Mem:   60387  12981  47405   0   19  4732 
-/+ buffers/cache:  8229  52157 
Swap:   0   0   0 

正如你所看到的,我在做資源相當不錯。我在Solr的配置增加maxWarmingSearchers的數量爲200,因爲我得到的錯誤:

Exceeded limit of maxWarmingSearchers=2, try again later

好吧,但我不認爲增加了限制確實是正確的做法。我認爲問題在於,對於每個文件,我正在做一個提交,並且我應該批量執行此操作(例如50k個文件/提交),但我不完全確定如何修改此代碼,以及每個示例我一次只看到一個文件。我真的需要盡我所能來儘可能快地運行,因爲我真的沒有85天的時間來獲取Solr中的數據。

這裏是我的代碼:

Index.java

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 


public class Index { 

    public static void main(String[] args) { 
     String directory = "/opt/html"; 
     String solrUrl = "URL"; 
     final int QUEUE_SIZE = 250000; 
     final int MAX_THREADS = 300; 

     BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_SIZE); 


     SolrProducer producer = new SolrProducer(queue, directory); 
     new Thread(producer).start(); 

     for (int i = 1; i <= MAX_THREADS; i++) 
      new Thread(new SolrConsumer(queue, solrUrl)).start(); 
    } 
} 

Producer.java

import java.io.IOException; 
import java.nio.file.*; 
import java.nio.file.attribute.BasicFileAttributes; 
import java.util.concurrent.BlockingQueue; 

public class SolrProducer implements Runnable { 
    private BlockingQueue<String> queue; 
    private String directory; 

    public SolrProducer(BlockingQueue<String> queue, String directory) { 
     this.queue = queue; 
     this.directory = directory; 
    } 

    @Override 
    public void run() { 
     try { 
      Path path = Paths.get(directory); 
      Files.walkFileTree(path, new SimpleFileVisitor<Path>() { 
       @Override 
       public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { 
        if (!attrs.isDirectory()) { 
         try { 
          queue.put(file.toString()); 
         } catch (InterruptedException e) { 
         } 
        } 
        return FileVisitResult.CONTINUE; 
       } 
      }); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

    } 

} 

Consumer.java

import co.talentiq.common.net.SolrManager; 
import org.apache.solr.client.solrj.SolrServerException; 

import java.io.IOException; 
import java.util.concurrent.BlockingQueue; 

public class SolrConsumer implements Runnable { 
    private BlockingQueue<String> queue; 
    private static SolrManager sm; 

    public SolrConsumer(BlockingQueue<String> queue, String url) { 
     this.queue = queue; 
     if (sm == null) 
      this.sm = new SolrManager(url); 
    } 

    @Override 
    public void run() { 
     try { 
      while (true) { 
       String file = queue.take(); 
       sm.indexFile(file); 
      } 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } catch (SolrServerException e) { 
      e.printStackTrace(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

SolrManager.java

import org.apache.solr.client.solrj.SolrClient; 
import org.apache.solr.client.solrj.SolrServerException; 
import org.apache.solr.client.solrj.impl.HttpSolrClient; 
import org.apache.solr.client.solrj.request.AbstractUpdateRequest; 
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; 

import java.io.File; 
import java.io.IOException; 
import java.util.UUID; 

public class SolrManager { 
    private static String urlString; 
    private static SolrClient solr; 

    public SolrManager(String url) { 
     urlString = url; 
     if (solr == null) 
      solr = new HttpSolrClient(url); 
    } 

    public void indexFile(String fileName) throws IOException, SolrServerException { 
     ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/extract"); 
     String solrId = UUID.randomUUID().toString(); 
     up.addFile(new File(fileName), solrId); 
     up.setParam("literal.id", solrId); 
     up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true); 
     solr.request(up); 
    } 

} 

回答

0

您可以使用up.setCommitWithin(10000);,使Solr的只是犯自動的,至少每十秒鐘。增加值以使Solr每分鐘(60000)或每十分鐘(600000)提交一次。刪除顯式提交(setAction(..))。

另一種選擇是configure autoCommit in your configuration file

您也可以通過將Solr中的HTML提取流程從Solr中移出(並僅提交要編入索引的文本),或者擴展要發佈的服務器數量(集羣中的更多節點) 。

0

我猜你不會在文檔索引時並行搜索索引。所以這裏是你可以做的事情。

  • 您可以在solrconfig.xml中配置自動提交選項。它可以根據文件數/時間間隔來完成。對你來說,文件數量選項會更有意義。

  • 移除對ContentStreamUpdateRequest對象中的setAction()方法的調用。您可以維護對indexFile()方法進行調用的次數。假如它達到25000/10000(基於你的堆,你可以限制計數),那麼對於那個索引調用,你可以使用solr.commit()這樣的SolrClient對象來執行提交。以便爲指定的計數進行一次提交。

讓我知道結果。祝你好運!