2016-09-27 113 views
0

我使用elasticsearch滾動API來返回大量的文件。根據documentation,使用ElasticSearch Scroll API時,如何在原位優化時間參數?

「滾動到期時間刷新每次我們運行一個滾動請求,所以它只需要足夠長的時間來處理當前的一批結果,而不是所有與查詢匹配的文檔。超時非常重要,因爲保持滾動窗口處於打開狀態會消耗資源,並且我們希望在不再需要時立即釋放它們。設置超時將使Elasticsearch在一段時間不活動後自動釋放資源。「

我的問題是如何優化時間參數?我有一些需要處理大約600頁的實例,並且它會在第300頁失敗(很長一段時間!)。我懷疑如果我可以優化傳遞的時間參數,那麼使用ES資源會更有效率,而且不會失敗。此代碼正在羣集中進行測試,但將被移植到其他許多羣集中,因此我希望優化時間參數以適應羣集。另外,我不想在ES集羣上使用比我需要的其他用戶更多的資源,但也可以使用其他用戶。

這是我的想法。在最初的滾動請求中,傳遞一個慷慨的時間參數,例如5m,然後返回結果的第一頁需要多長時間。然後在第二個滾動請求中,我們傳遞一個時間參數,它比第一個請求花費的觀察時間稍大一點。從感應上來說,每個頁面請求的時間會比先前觀察到的頁面完成時間稍長。這假設由於每個頁面都返回相同數量的文檔(在我的情況下幾乎相同大小),因此返回該頁面所需的時間與之前觀察到的大致相同。這個假設是否成立?

是否有更智能的方法來適應時間參數?對於這個問題,尺寸參數(在上面的想法中,尺寸參數保持不變)。

回答

0

好的,我做了一些數據分析,發現了一些經驗。對於許多不同的尺寸,我跑了10-20頁的滾動API查詢。對於一個固定的大小,返回頁面花費的時間大致爲高斯,下面給出了方法。

means = {1000: 6.0284869194030763, 
1500: 7.9487858772277828, 
2000: 12.139444923400879, 
2500: 18.494202852249146, 
3000: 22.169868159294129, 
3500: 28.091009926795959, 
4000: 36.068559408187866, 
5000: 53.229292035102844} 

下我想過,這可能取決於是否有其他疑問正在機器上運行,所以我跑了實驗頁面的一半是從ES唯一的請求和一半,而第二渦旋查詢正在運行。時機似乎沒有改變。

最後,因爲時間取決於給定的ES配置和帶寬等。我提出這個解決方案。

  1. 爲初始頁面設置了大量的頁面時間。
  2. 時間每個頁面
  3. 使用觀察時間+一點點,和初始時間之間的加權移動平均(這樣你的時間參數總是比需要的大一點,但下降到平均+一點點)。這裏有一個例子:

    嘗試= 0 size = 3000 WAIT_TIME = 2 ##慷慨開始時間
    returned_hits = {} ##頁,命中 的名單,同時嘗試< 3: 嘗試: 打印「\ n \ t運行大小=%s的警報滾動查詢...「%(大小) 頁= client.search(指數=指數,DOC_TYPE = DOC_TYPE,身體= Q,滾動= '1米',SEARCH_TYPE = '掃描',大小=)

    sid = page['_scroll_id'] ## scroll id 
        total_hits = page['hits']['total'] ## how many results there are. 
        print "\t\t There are %s hits total." %(total_hits) 
    
        p = 0 ## page count 
        doc_count = 0 ## document count 
        # Start scrolling 
        while (scroll_size > 0): 
         p += 1 
         print "\t\t Scrolling to page %s ..." % p 
         start = time.time() 
         page = client.scroll(scroll_id = sid, scroll = str(wait_time) + 'm') 
         end = time.time() 
    
         ## update wait_time using a weighted running average. 
         wait_time = ((end - start + 10) + float(wait_time * p))/(p+1) 
         print "\t\t Page %s took %s seconds. We change the time to %s" %(p, end - start, wait_time) 
    
         sid = page['_scroll_id'] # Update the scroll ID 
         scroll_size = len(page["hits"]["hits"]) ## no. of hits returned on this page 
    
         print "\t\t Page %s has returned %s hits. Storing .." %(p, scroll_size) 
         returned_hits[p] = page['hits']['hits'] 
    
         doc_count += scroll_size ## update the total count of docs processed 
         print "\t\t Returned and stored %s docs of %s \n" %(doc_count, total_hits) 
    
        tries = 3 ## set tries to three so we exit the while loop! 
    
    except: 
        e = sys.exc_info()[0] 
        print "\t\t ---- Error on try %s\n\t\t size was %s, wait_time was %s min, \n\t\terror message = %s" %(tries , _size, wait_time, e) 
    
        tries += 1 ## increment tries, and do it again until 3 tries. 
        # wait_time *= 2 ## double the time interval for the next go round 
        size = int(.8 * size) ## lower size of docs per shard returned. 
    
        if tries == 3: 
         print "\t\t three strikes and you're out! (failed three times in a row to execute the alert query). Exiting. " 
    
        else: 
         print '\t\t ---- trying again for the %s-th time ...' %(tries + 1)