2016-02-12 237 views
0

我是ElasticSearch的新手,並嘗試使用來自ElasticSearch ConnectionPool [通過Transport class]的併發連接將條目添加到ElasticSearch中的索引。使用elasticsearch-py庫的ElasticSearch ConnectionPool

這裏是我的代碼:

import elasticsearch 
from elasticsearch.transport import Transport 

def init_connection(): 
    transport = Transport([{'host':SERVER_URL}], port=SERVER_PORT, randomize_hosts=False) 
    transport.add_connection(host=SERVER_URL+SERVER_PORT) 
    return transport 

def add_entries_to_es(id, name): 
    transport = init_connection() 
    doc = { 
      'name': name, 
      'postDate': datetime.datetime.now(), 
      'valid': "true", 
      'suggest': { 
       "input": name, 
       'output': name, 
       'payload': {'domain_id': id} 
       } 
      } 
    conn = transport.getConnection() 
    es = elasticsearch.Elasticsearch(connection_class=conn) 
    res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc) 
    ... 

而且我得到下面的錯誤:

File "/my_project/elastichelper.py", line 23, in init_connection 
transport.add_connection(host=SERVER_URL+SERVER_PORT) 
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 139, in add_connection 
self.set_connections(self.hosts) 
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 169, in set_connections 
connections = map(_create_connection, hosts) 
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 161, in _create_connection 
kwargs.update(host) 
ValueError: dictionary update sequence element #0 has length 1; 2 is required 

我不知道,如果是Transport class實例在ElasticSearch一個ConnectionPool的正確途徑。但是,我從文檔中看到Transport class處理各個連接的實例,並創建一個連接池來保存它們。

我沒有得到正確的方式來實例化ConnectionPool並從池中有效地使用連接。閱讀和谷歌搜索沒有對我有利。

我也知道關於helpers.bulk() API,但我很困惑使用它,因爲隨着添加條目到索引,我也刪除無效條目。

回答

0

我發現只需使用ElasticSearch類實例與設置適當的timeout對我來說,timeout=30足夠好]爲index方法處理。就像這樣:

doc = { 
     'name': name, 
     'postDate': datetime.datetime.now(), 
     'valid': "true", 
     'suggest': { 
      "input": name, 
      'output': name, 
      'payload': {'domain_id': id} 
      } 
     } 
es = elasticsearch.Elasticsearch() 
res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc, timeout=30) 

我最初面臨timeout問題用簡單的ElasticSearch類的實例,這是由固定的上述變化。

我不需要明確地使用TransportConnection類實例。