2015-02-05 65 views
1

我正嘗試使用Python API將大量文檔批量插入到彈性搜索中。如何在使用Python API的彈性搜索中進行批量索引

import elasticsearch 
from pymongo import MongoClient 

es = elasticsearch.Elasticsearch() 

def index_collection(db, collection, fields, host='localhost', port=27017): 
    conn = MongoClient(host, port) 
    coll = conn[db][collection] 
    cursor = coll.find({}, fields=fields, timeout=False) 
    print "Starting Bulk index of {} documents".format(cursor.count()) 

    def action_gen(): 
     """ 
     Generator to use for bulk inserts 
     """ 
     for n, doc in enumerate(cursor): 

      op_dict = { 
       '_index': db.lower(), 
       '_type': collection, 
       '_id': int('0x' + str(doc['_id']), 16), 
      } 
      doc.pop('_id') 
      op_dict['_source'] = doc 
      yield op_dict 

    res = bulk(es, action_gen(), stats_only=True) 
    print res 

的文件來自MongoDB的收集和我會哄上面的函數根據該文檔的方式解釋來做大量的索引。

批量索引繼續填充數千個空文檔的彈性搜索。誰能告訴我我做錯了什麼?

+0

你的索引已經存在於ES嗎?如果是這樣,是否有爲其定義的映射(是否所有可能的來自Mongo的文檔都不適合映射)? – rchang 2015-02-05 23:37:14

+0

你的代碼適合我。也許你的bug是數據特定的。謹慎舉一個最小的例子? – thorwhalen 2015-03-23 16:03:53

回答

2

我從來沒有見過把這些數據放在一起,特別是你在用"_source"做什麼。可能有辦法讓這個工作,我不知道副手,但是當我嘗試它時,我得到了奇怪的結果。

如果你看看bulk api,ES正期望一個元數據文檔,然後是要索引的文檔。因此,您需要在批量數據列表中爲每個文檔輸入兩個條目。所以可能是這樣的:

import elasticsearch 
from pymongo import MongoClient 

es = elasticsearch.Elasticsearch() 

def index_collection(db, collection, fields, host='localhost', port=27017): 
    conn = MongoClient(host, port) 
    coll = conn[db][collection] 
    cursor = coll.find({}, fields=fields, timeout=False) 
    print "Starting Bulk index of {} documents".format(cursor.count()) 

    bulk_data = [] 

    for n, doc in enumerate(cursor): 

     bulk_data.append({ 
      '_index': db.lower(), 
      '_type': collection, 
      '_id': int('0x' + str(doc['_id']), 16), 
     }) 
     bulk_data.append(doc) 

    es.bulk(index=index_name,body=bulk_data,refresh=True) 

雖然我沒有嘗試運行該代碼。這裏是我知道的一個腳本,如果有幫助,你可以玩:

from elasticsearch import Elasticsearch 

es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }]) 

index_name = "test_index" 

if es_client.indices.exists(index_name): 
    print("deleting '%s' index..." % (index_name)) 
    print(es_client.indices.delete(index = index_name, ignore=[400, 404])) 

print("creating '%s' index..." % (index_name)) 
print(es_client.indices.create(index = index_name)) 

bulk_data = [] 

for i in range(4): 
    bulk_data.append({ 
     "index": { 
      "_index": index_name, 
      "_type": 'doc', 
      "_id": i 
     } 
    }) 
    bulk_data.append({ "idx": i }) 

print("bulk indexing...") 
res = es_client.bulk(index=index_name,body=bulk_data,refresh=True) 
print(res) 

print("results:") 
for doc in es_client.search(index=index_name)['hits']['hits']: 
    print(doc) 
+0

它可能工作,但對我的用例,我不想將所有數據加載到內存中的列表中,然後我索引它,因此生成器函數。這些文檔讓我看到了我發佈的代碼:https://elasticsearch-py.readthedocs.org/en/master/helpers.html#helpers – fccoelho 2015-02-06 00:28:13

+0

@SloanAhrens OP正在使用'bulk'的幫助器版本,它需要一些東西不同於標準批量API。 – rchang 2015-02-06 02:16:09