我在Kubernetes和AWS和I上測試自動縮放Dask分佈式實現時創建了一個演示問題我不確定我是否正確解決了該問題。使用Dask進行大規模並行搜索操作,分佈式
我的場景是一個字符串(表示密碼)的md5散列找到原始字符串。我遇到了三個主要問題。
A)參數空間很大,試圖用2.8211099e + 12個成員創建一個dask包導致了內存問題(因此您將在下面的示例代碼中看到'explode'函數)。 B)在早期發現時清理出口。我認爲使用take(1, npartitions=-1)
會實現這一點,但我不確定。本來我提出了一個例外raise Exception("%s is your answer' % test_str)
,但工作,但感覺「髒」
C)鑑於這是長時間運行,有時工人或AWS盒死亡,最好如何存儲進度?
示例代碼:
import distributed
import math
import dask.bag as db
import hashlib
import dask
import os
if os.environ.get('SCHED_URL', False):
sched_url = os.environ['SCHED_URL']
client = distributed.Client(sched_url)
versions = client.get_versions(True)
dask.set_options(get=client.get)
difficulty = 'easy'
settings = {
'hard': (hashlib.md5('welcome1'.encode('utf-8')).hexdigest(),'abcdefghijklmnopqrstuvwxyz1234567890', 8),
'mid-hard': (hashlib.md5('032abgh'.encode('utf-8')).hexdigest(),'abcdefghijklmnop1234567890', 7),
'mid': (hashlib.md5('b08acd'.encode('utf-8')).hexdigest(),'abcdef', 6),
'easy': (hashlib.md5('0812'.encode('utf-8')).hexdigest(),'', 4)
}
hashed_pw, keyspace, max_guess_length = settings[difficulty]
def is_pw(guess):
return hashlib.md5(guess.encode('utf-8')).hexdigest() == hashed_pw
def guess(n):
guess = ''
size = len(keyspace)
while n>0 :
n -= 1
guess += keyspace[n % size];
n = math.floor(n/size);
return guess
def make_exploder(num_partitions, max_val):
"""Creates a function that maps a int to a range based on the number maximum value aimed for
and the number of partitions that are expected.
Used in this code used with map and flattent to take a short list
i.e 1->1e6 to a large one 1->1e20 in dask rather than on the host machine."""
steps = math.ceil(max_val/num_partitions)
def explode(partition):
return range(partition * steps, partition * steps + steps)
return explode
max_val = len(keyspace) ** max_guess_length # How many possiable password permutation
partitions = math.floor(max_val/100)
partitions = partitions if partitions < 100000 else 100000 # split in to a maximum of 10000 partitions. Too many partitions caused issues, memory I think.
exploder = make_exploder(partitions, max_val) # Sort of the opposite of a reduce. make_exploder(10, 100)(3) => [30, 31, ..., 39]. Expands the problem back in to the full problem space.
print("max val: %s, partitions:%s" % (max_val, partitions))
search = db.from_sequence(range(partitions), npartitions=partitions).map(exploder).flatten().filter(lambda i: i <= max_val).map(guess).filter(is_pw)
search.take(1,npartitions=-1)
我覺得 '輕鬆' 效果很好本地, '中硬' 效果很好我們6至8 * m4.2xlarge AWS集羣上。但到目前爲止還沒有hard
工作。