就像標題所說...如何在ElasticSearch通過Java API重新索引
我看了這篇文章(https://www.elastic.co/blog/changing-mapping-with-zero-downtime),這一概念是偉大的,但我在努力尋找如何通過JAVA API做體面的參考。
我發現這個插件:https://github.com/karussell/elasticsearch-reindex,但似乎像什麼,我試圖做
就像標題所說...如何在ElasticSearch通過Java API重新索引
我看了這篇文章(https://www.elastic.co/blog/changing-mapping-with-zero-downtime),這一概念是偉大的,但我在努力尋找如何通過JAVA API做體面的參考。
我發現這個插件:https://github.com/karussell/elasticsearch-reindex,但似乎像什麼,我試圖做
在當地的星巴克在這裏經過一番研究,矯枉過正是我想出了:
假設我們有我們的索引已經存在(「old_index」)並且它有數據...現在讓我們將這些數據移動到我們創建的新索引(「new_index」)(可能對於某個字段使用不同的模式STRING和INT,或者現在您決定你不再希望分析或存儲某個領域等)。
這裏的基本思路是檢索已存在索引(「old_index」)中的所有數據並將其攝入新索引(「new_index」)。然而,也有你所要做的幾件事情:
步驟1.您需要執行搜索滾動 https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
所做的只是結果,更有效地檢索VS常規搜索。沒有評分等。這裏是文檔必須說的:「滾動不是用於實時用戶請求,而是用於處理大量數據,例如爲了將一個索引的內容重新索引到新索引中採用不同的配置。「
這裏是Java API的一個鏈接,如何使用它:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html
第2步:在做刀片,你必須使用批量採集。再一次,這是出於性能原因完成的。這裏是一個鏈接到批量攝取的Java API:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor
現在到浩真正做到這一點...
步驟1.設置滾動搜索那會「加載」數據從舊索引
SearchResponse scrollResp = client.prepareSearch("old_index") // Specify index
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(60000))
.setQuery(QueryBuilders.matchAllQuery()) // Match all query
.setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll
第2步。設置批量處理器。
int BULK_ACTIONS_THRESHOLD = 1000;
int BULK_CONCURRENT_REQUESTS = 1;
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Bulk Going to execute new bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("Executed bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.warn("Error executing bulk", failure);
}
}).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS).setFlushInterval(TimeValue.timeValueMillis(5)).build();
第三步:從通過創建滾動搜索在步驟1中舊的索引讀取,直到有莫記錄左,插入新的索引
//Scroll until no hits are returned
while (true) {
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
//Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
logger.info("Closing the bulk processor");
bulkProcessor.close();
break;
}
// Get results from a scan search and add it to bulk ingest
for (SearchHit hit: scrollResp.getHits()) {
IndexRequest request = new IndexRequest("new_index", hit.type(), hit.id());
Map source = ((Map) ((Map) hit.getSource()));
request.source(source);
bulkProcessor.add(request);
}
}
第4步:現在是時候來分配現有的別名,指向舊索引,新索引。然後刪除對舊索引的別名引用,然後刪除舊索引本身。要了解如何確定被分配到已有老指數看到這個帖子該別名:ElasticSeach JAVA API to find aliases given index
從舊索引指定別名,以新的索引
client.admin().indices().prepareAliases().addAlias("new_index", "alias_name").get();
刪除別名,然後刪除舊索引
client.admin().indices().prepareAliases().removeAlias("old_index", "alias_name").execute().actionGet();
client.admin().indices().prepareDelete("old_index").execute().actionGet();
自ES 2.0以來,您可以使用reindex API。 由於是如何與這裏的Java API這樣做沒有文件的步驟是:
:
client = TransportClient.builder().settings(elaSettings).addPlugin(ReindexPlugin.class).build();
ReindexRequestBuilder builder = ReindexAction.INSTANCE.newRequestBuilder(client).source(oldIndex).destination(newIndex); builder.destination().setOpType(opType); builder.abortOnVersionConflict(false); builder.get();
如果使用Jest,可以使用Reindex.Builder(io.searchbox.indices.reindex.Reindex)。截至本文最新的Jest 5.3.2已經發布。
downvote對於我試圖分享和幫助?當真? – krinker