2017-08-31 91 views
0

我在Kubernetes和AWS和I上測試自動縮放Dask分佈式實現時創建了一個演示問題我不確定我是否正確解決了該問題。使用Dask進行大規模並行搜索操作,分佈式

我的場景是一個字符串(表示密碼)的md5散列找到原始字符串。我遇到了三個主要問題。

A)參數空間很大,試圖用2.8211099e + 12個成員創建一個dask包導致了內存問題(因此您將在下面的示例代碼中看到'explode'函數)。 B)在早期發現時清理出口。我認爲使用take(1, npartitions=-1)會實現這一點,但我不確定。本來我提出了一個例外raise Exception("%s is your answer' % test_str),但工作,但感覺「髒」

C)鑑於這是長時間運行,有時工人或AWS盒死亡,最好如何存儲進度?

示例代碼:

import distributed 
import math 
import dask.bag as db 
import hashlib 
import dask 
import os 

if os.environ.get('SCHED_URL', False): 
    sched_url = os.environ['SCHED_URL'] 
    client = distributed.Client(sched_url) 
    versions = client.get_versions(True) 
    dask.set_options(get=client.get) 

difficulty = 'easy' 

settings = { 
    'hard': (hashlib.md5('welcome1'.encode('utf-8')).hexdigest(),'abcdefghijklmnopqrstuvwxyz1234567890', 8), 
    'mid-hard': (hashlib.md5('032abgh'.encode('utf-8')).hexdigest(),'abcdefghijklmnop1234567890', 7), 
    'mid': (hashlib.md5('b08acd'.encode('utf-8')).hexdigest(),'abcdef', 6), 
    'easy': (hashlib.md5('0812'.encode('utf-8')).hexdigest(),'', 4) 
} 

hashed_pw, keyspace, max_guess_length = settings[difficulty] 

def is_pw(guess): 
    return hashlib.md5(guess.encode('utf-8')).hexdigest() == hashed_pw 

def guess(n): 
    guess = '' 
    size = len(keyspace) 
    while n>0 : 
     n -= 1 
     guess += keyspace[n % size]; 
     n = math.floor(n/size); 
    return guess 

def make_exploder(num_partitions, max_val): 
    """Creates a function that maps a int to a range based on the number maximum value aimed for 
     and the number of partitions that are expected. 
     Used in this code used with map and flattent to take a short list 
     i.e 1->1e6 to a large one 1->1e20 in dask rather than on the host machine.""" 
    steps = math.ceil(max_val/num_partitions) 
    def explode(partition): 
     return range(partition * steps, partition * steps + steps) 
    return explode 


max_val = len(keyspace) ** max_guess_length # How many possiable password permutation 
partitions = math.floor(max_val/100) 
partitions = partitions if partitions < 100000 else 100000 # split in to a maximum of 10000 partitions. Too many partitions caused issues, memory I think. 
exploder = make_exploder(partitions, max_val) # Sort of the opposite of a reduce. make_exploder(10, 100)(3) => [30, 31, ..., 39]. Expands the problem back in to the full problem space. 

print("max val: %s, partitions:%s" % (max_val, partitions)) 

search = db.from_sequence(range(partitions), npartitions=partitions).map(exploder).flatten().filter(lambda i: i <= max_val).map(guess).filter(is_pw) 

search.take(1,npartitions=-1) 

我覺得 '輕鬆' 效果很好本地, '中硬' 效果很好我們6至8 * m4.2xlarge AWS集羣上。但到目前爲止還沒有hard工作。

回答

2

A)參數空間很大並且試圖用2.8211099e + 12個成員創建一個dask包導致了內存問題(因此您會在下面的示例代碼中看到'explode'函數)。

這很大程度上取決於如何將您的元素放入包中。如果每個元素都在自己的分區,那麼是的,這肯定會殺死所有的東西。 1e12分區非常昂貴。我建議保持數千或數萬個分區的數量。

B)早期發現清理出口。我認爲使用take(1,npartitions = -1)會實現這一點,但我不確定。本來我引發異常引發異常(「%s是你的答案」%test_str),它的工作,但覺得‘髒’

如果你想這個話,我建議不要使用dask.bag,而是使用concurrent.futures interface和特別是as_completed迭代器。

C)鑑於這是長時間運行,有時工人或AWS框死了,怎麼會是最好保存進度?

DASK應該是彈性的,以這個爲長因爲你可以保證調度uler倖存下來。如果您使用併發期貨界面而不是dask bag,那麼您還可以跟蹤客戶端進程的中間結果。