我有一個包含2億HTML文件的目錄(不要看着我,我沒有創建這個混亂,我只需要處理它)。我需要將該目錄中的每個HTML文件編入索引。我一直在閱讀關於完成工作的指南,而且我現在有一些事情要做。大約一個小時之後,我已經獲得了大約100k的索引,這意味着大概需要85天。如何用Solr Cell批量索引HTML文件?
我將這些文件編入索引到獨立的Solr服務器,該服務器運行在c4.8xlarge AWS EC2實例上。下面是free -m
輸出與Solr的服務器上運行,而我寫的索引運行,以及:
total used free shared buffers cached
Mem: 60387 12981 47405 0 19 4732
-/+ buffers/cache: 8229 52157
Swap: 0 0 0
正如你所看到的,我在做資源相當不錯。我在Solr的配置增加maxWarmingSearchers的數量爲200,因爲我得到的錯誤:
Exceeded limit of maxWarmingSearchers=2, try again later
好吧,但我不認爲增加了限制確實是正確的做法。我認爲問題在於,對於每個文件,我正在做一個提交,並且我應該批量執行此操作(例如50k個文件/提交),但我不完全確定如何修改此代碼,以及每個示例我一次只看到一個文件。我真的需要盡我所能來儘可能快地運行,因爲我真的沒有85天的時間來獲取Solr中的數據。
這裏是我的代碼:
Index.java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Index {
public static void main(String[] args) {
String directory = "/opt/html";
String solrUrl = "URL";
final int QUEUE_SIZE = 250000;
final int MAX_THREADS = 300;
BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
SolrProducer producer = new SolrProducer(queue, directory);
new Thread(producer).start();
for (int i = 1; i <= MAX_THREADS; i++)
new Thread(new SolrConsumer(queue, solrUrl)).start();
}
}
Producer.java
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.BlockingQueue;
public class SolrProducer implements Runnable {
private BlockingQueue<String> queue;
private String directory;
public SolrProducer(BlockingQueue<String> queue, String directory) {
this.queue = queue;
this.directory = directory;
}
@Override
public void run() {
try {
Path path = Paths.get(directory);
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (!attrs.isDirectory()) {
try {
queue.put(file.toString());
} catch (InterruptedException e) {
}
}
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
Consumer.java
import co.talentiq.common.net.SolrManager;
import org.apache.solr.client.solrj.SolrServerException;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
public class SolrConsumer implements Runnable {
private BlockingQueue<String> queue;
private static SolrManager sm;
public SolrConsumer(BlockingQueue<String> queue, String url) {
this.queue = queue;
if (sm == null)
this.sm = new SolrManager(url);
}
@Override
public void run() {
try {
while (true) {
String file = queue.take();
sm.indexFile(file);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
SolrManager.java
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
public class SolrManager {
private static String urlString;
private static SolrClient solr;
public SolrManager(String url) {
urlString = url;
if (solr == null)
solr = new HttpSolrClient(url);
}
public void indexFile(String fileName) throws IOException, SolrServerException {
ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/extract");
String solrId = UUID.randomUUID().toString();
up.addFile(new File(fileName), solrId);
up.setParam("literal.id", solrId);
up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
solr.request(up);
}
}