2017-10-16 78 views
11

我試圖並行for循環(previously asked here)一個尷尬的並行,並且適合我的參數定居在this implementation爲什麼我沒有看到通過Python中的多處理加速?

with Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets, train_size_common, feat_sel_size, train_perc, 
              total_test_samples, num_classes, num_features, label_set, 
              method_names, pos_class_index, out_results_dir, exhaustive_search]) 
     partial_func_holdout = partial(holdout_trial_compare_datasets, *shared_inputs) 

     with Pool(processes=num_procs) as pool: 
      cv_results = pool.map(partial_func_holdout, range(num_repetitions)) 

我之所以需要在使用proxy object(進程間共享)是第一要素共享代理列表datasets這是一個大型對象(每個約200-300MB)的列表。這個datasets列表通常有5-25個元素。我通常需要在HPC羣集上運行此程序。

這是一個問題,當我使用32個進程和50GB的內存(num_repetitions = 200,數據集是10個對象的列表,每個250MB)運行此程序時,我甚至沒有看到加速因子達到16 (有32個並行進程)。我不明白爲什麼 - 任何線索?任何明顯的錯誤,或不好的選擇?我在哪裏可以改進這種實施?任何替代品?

我確信這已經在之前討論過了,原因可以是多種多樣的,並且非常具體實施 - 因此我要求您爲我提供2美分。謝謝。

更新:我做了一些與cProfile分析得到一個更好的主意 - 這裏有一些信息,按累計時間排序。

In [19]: p.sort_stats('cumulative').print_stats(50) 
Mon Oct 16 16:43:59 2017 profiling_log.txt 

     555404 function calls (543552 primitive calls) in 662.201 seconds 

    Ordered by: cumulative time 
    List reduced from 4510 to 50 due to restriction <50> 

    ncalls tottime percall cumtime percall filename:lineno(function) 
    897/1 0.044 0.000 662.202 662.202 {built-in method builtins.exec} 
     1 0.000 0.000 662.202 662.202 test_rhst.py:2(<module>) 
     1 0.001 0.001 661.341 661.341 test_rhst.py:70(test_chance_classifier_binary) 
     1 0.000 0.000 661.336 661.336 /Users/Reddy/dev/neuropredict/neuropredict/rhst.py:677(run) 
     4 0.000 0.000 661.233 165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:533(wait) 
     4 0.000 0.000 661.233 165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:263(wait) 
     23 661.233 28.749 661.233 28.749 {method 'acquire' of '_thread.lock' objects} 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:261(map) 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:637(get) 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:634(wait) 
    866/8 0.004 0.000 0.868 0.108 <frozen importlib._bootstrap>:958(_find_and_load) 
    866/8 0.003 0.000 0.867 0.108 <frozen importlib._bootstrap>:931(_find_and_load_unlocked) 
    720/8 0.003 0.000 0.865 0.108 <frozen importlib._bootstrap>:641(_load_unlocked) 
    596/8 0.002 0.000 0.865 0.108 <frozen importlib._bootstrap_external>:672(exec_module) 
    1017/8 0.001 0.000 0.863 0.108 <frozen importlib._bootstrap>:197(_call_with_frames_removed) 
    522/51 0.001 0.000 0.765 0.015 {built-in method builtins.__import__} 

的剖析信息現在time

In [20]: p.sort_stats('time').print_stats(20) 
Mon Oct 16 16:43:59 2017 profiling_log.txt 

     555404 function calls (543552 primitive calls) in 662.201 seconds 

    Ordered by: internal time 
    List reduced from 4510 to 20 due to restriction <20> 

    ncalls tottime percall cumtime percall filename:lineno(function) 
     23 661.233 28.749 661.233 28.749 {method 'acquire' of '_thread.lock' objects} 
    115/80 0.177 0.002 0.211 0.003 {built-in method _imp.create_dynamic} 
     595 0.072 0.000 0.072 0.000 {built-in method marshal.loads} 
     1 0.045 0.045 0.045 0.045 {method 'acquire' of '_multiprocessing.SemLock' objects} 
    897/1 0.044 0.000 662.202 662.202 {built-in method builtins.exec} 
     3 0.042 0.014 0.042 0.014 {method 'read' of '_io.BufferedReader' objects} 
2037/1974 0.037 0.000 0.082 0.000 {built-in method builtins.__build_class__} 
     286 0.022 0.000 0.061 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:12(docformat) 
    2886 0.021 0.000 0.021 0.000 {built-in method posix.stat} 
     79 0.016 0.000 0.016 0.000 {built-in method posix.read} 
     597 0.013 0.000 0.021 0.000 <frozen importlib._bootstrap_external>:830(get_data) 
     276 0.011 0.000 0.013 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/sre_compile.py:250(_optimize_charset) 
     108 0.011 0.000 0.038 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py:626(_construct_argparser) 
    1225 0.011 0.000 0.050 0.000 <frozen importlib._bootstrap_external>:1233(find_spec) 
    7179 0.009 0.000 0.009 0.000 {method 'splitlines' of 'str' objects} 
     33 0.008 0.000 0.008 0.000 {built-in method posix.waitpid} 
     283 0.008 0.000 0.015 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:128(indentcount_lines) 
     3 0.008 0.003 0.008 0.003 {method 'poll' of 'select.poll' objects} 
    7178 0.008 0.000 0.008 0.000 {method 'expandtabs' of 'str' objects} 
     597 0.007 0.000 0.007 0.000 {method 'read' of '_io.FileIO' objects} 

更多分類剖析信息由percall信息分類: profiling info sorted by percall

更新2

元素在大名單datasets我提到以前版本通常不會那麼大 - 它們通常是每個10-25MB。但取決於所使用的浮點精度,樣本數量和功能,這也可以輕鬆地增長到每個元素500MB-1GB。因此我更喜歡可以擴展的解決方案。

更新3:

內部holdout_trial_compare_datasets代碼使用的方法GridSearchCV scikit學習,其內部使用JOBLIB庫如果我們設置n_jobs> 1(或每當我們甚至設置)。這可能會導致多處理和joblib之間的一些不良交互。所以嘗試另一個配置,我根本沒有設置n_jobs(這應該在scikit-learn中默認沒有並行性)。會及時向大家發佈。

+0

你做任何分析? – georgexsh

+0

還沒有,因爲我想測試它的參數(16-32個過程,有10-15個數據集)需要我在羣集上運行它,我不知道如何在命令行上配置python程序。我會盡快研究它。 –

+0

我的2¢:如果你的大數據對象只從父對象傳遞給子對象,'Manager'看起來有點矯枉過正,你可以把它加載到父對象中的一個全局變量中,然後它將在'fork()'之後與子對象共享。 – georgexsh

回答

0
{method 'acquire' of '_thread.lock' objects} 

看看你的分析器輸出我會說共享對象鎖定/解鎖開銷壓倒多線程的速度收益。

重構使得工作不再需要彼此交談。

具體來說,如果可能的話,每個數據堆得到一個答案,然後對累計結果進行操作。

這就是爲什麼隊列看起來如此之快:它們涉及一種工作類型,不需要必須「管理」並鎖定/解鎖的對象。

只能'管理'絕對需要在進程之間共享的內容。你管理列表中包含了一些非常複雜的對象,來...

更快的範例是:

allwork = manager.list([a, b,c]) 
theresult = manager.list() 

然後

while mywork: 
    unitofwork = allwork.pop() 
    theresult = myfunction(unitofwork) 
+0

感謝您的建議和回覆。事實上,不同進程中的工作不需要彼此交談,即一旦他們訪問共享數據列表'shared_inputs',工作單元內的所有內容都是獨立於他人的。 –

+0

我不確定我的實施建議與您的建議有何不同。你可以試着詳細闡述一下,或者寫一個更加詳細的實現,可以替換頂部顯示的內容來替換它。那太棒了,謝謝。 –

0

如果你並不需要一個複雜的共享對象,則僅使用可以想象的最簡單對象的列表。

然後告訴工人獲取他們可以在自己的小世界中處理的複雜數據。

嘗試:

allwork = manager.list([datasetid1, datasetid2 ,...]) 
theresult = manager.list() 

while mywork: 
    unitofworkid = allwork.pop() 
    theresult = myfunction(unitofworkid) 

def myfunction(unitofworkid): 
    thework = acquiredataset(unitofworkid) 
    result = holdout_trial_compare_datasets(thework, ...) 

我希望這是有道理的。在這個方向上重構應該不需要太多時間。你應該可以看到,當你配置文件時,{方法'獲取''_thread.lock'對象}數字就像一塊石頭一樣掉落。

+0

謝謝,我需要考慮一下。這會在我的情況下導致巨大的I/O - 但需要考慮與實際的並行計算相比,它是否只是非常小的一部分時間。 –

2

基於在評論中討論,我做了一個小實驗,比較了三個版本實現的:

  • V1:基本上是一樣的做法,實際上,作爲partial(f1, *shared_inputs)將解壓proxy_manager.list立即Manager.List不涉及到這裏,數據傳遞給內部隊列爲Pool的工作人員。
  • v2:v2使用Manager.List,工作函數將收到一個ListProxy對象,它通過內部連接將共享數據提取到服務器進程。
  • v3:子進程從父進程共享數據,利用fork(2)系統調用。

def f1(*args): 
    for e in args[0]: pow(e, 2) 

def f2(*args): 
    for e in args[0][0]: pow(e, 2) 

def f3(n): 
    for i in datasets: pow(i, 2) 

def v1(np): 
    with mp.Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets,]) 
     pf = partial(f1, *shared_inputs) 
     with mp.Pool(processes=np) as pool: 
      r = pool.map(pf, range(16)) 

def v2(np): 
    with mp.Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets,]) 
     pf = partial(f2, shared_inputs) 
     with mp.Pool(processes=np) as pool: 
      r = pool.map(pf, range(16)) 

def v3(np): 
    with mp.Pool(processes=np) as pool: 
     r = pool.map(f3, range(16)) 

datasets = [2.0 for _ in range(10 * 1000 * 1000)] 
for f in (v1, v2, v3): 
    print(f.__code__.co_name) 
    for np in (2, 4, 8, 16): 
     s = time() 
     f(np) 
     print("%s %.2fs" % (np, time()-s)) 

結果採取一個16核心E5-2682 VPC,顯而易見的是,V3能更好地伸縮:result

+0

感謝喬治 - 真的幫助我更接近地理解問題。 我真的好奇* * shared_inputs'立即解包 - 我沒有意識到會發生這種情況。當我們應用'functools.partial'也會發生類似的事情,因爲它會嘗試凍結輸入和簽名,對嗎?所以無論是否打開包裝,數據都必須通過酸洗傳遞給子進程,對吧?那意味着,唯一我們可以避免的是將它們用作全局變量並從子進程訪問它們?這種方法有什麼危險?在子進程中不會改變輸入。 –

+0

as'* shared_inputs'在'partial'生效之前解壓縮列表,它將(代理)列表包含的對象「凍結」,而不是代理本身,v2它是使用帶有部分「Manager.List」的示例;是的數據得到泡菜/ unpickle傳遞;在你的問題和工作負載的情況下,沒有什麼可擔心的,如果你傳遞一些與外部資源相關的東西,比如套接字對象,文件描述符,你需要更加小心。 – georgexsh

+0

感謝George的幫助 - 我覺得我們需要更好地確定瓶頸。尤其是對於大對象(10MB 20MB)和複雜的計算(比平方數格要求更嚴格的事情,因爲這可能會引發內部記憶)..你的意見是非常有幫助的,我希望能夠指出這一點。 –

相關問題