2013-04-23 48 views
2

我想在它們到達時迭代ipython並行映射的一些異步結果。我能找到的唯一方法是迭代結果對象。但是,如果其中一項任務引發異常,則迭代終止。有沒有辦法做到這一點?請參閱下面的代碼,迭代在第二個作業引發異常時終止。在等待下一個ipython並行映射結果時處理異常

from IPython import parallel 

def throw_even(i): 
    if i % 2 == 0: 
     raise RuntimeError('ERROR: %d' % i) 
    return i 

rc = parallel.Client() 
lview = rc.load_balanced_view() # default load-balanced view 

# map onto the engines. 
args = range(1, 5) 
print args 
async_results = lview.map_async(throw_even, range(1, 5), ordered=True) 

# get results 
args_iter = iter(args) 
results_iter = iter(async_results) 
while True: 
    try: 
     arg = args_iter.next() 
     result = results_iter.next() 
     print 'Job %s completed: %d' % (arg, result)    
    except StopIteration: 
     print 'Finished iteration' 
     break 
    except Exception as e: 
     print '%s: Job %d: %s' % (type(e), arg, e) 

給出了下面的輸出作業3之前停止,4報告

[1, 2, 3, 4] 
Job 1 completed: 1 
<class 'IPython.parallel.error.RemoteError'>: Job 2: RuntimeError(ERROR: 2) 
Finished iteration 

是否有某種方式來做到這一點?

+0

我意識到,地圖習語不是一個合適的方式來做到這一點。我最好使用lview.apply並單獨處理每個結果。 – John 2013-04-24 09:13:10

回答

0

這個question可能是相關的。不過,我不明白爲什麼要從遠程引擎中拋出異常。雖然,如果你確實想這樣做,我認爲你可以用我回答提到的問題的方式來做到這一點。我現在看到你已經在你的評論中意識到了這一點,但無論如何這應該做到這一點。

def throw_even(i): 
    if i%2: 
     return i 
    raise(RuntimeError('Error %d'%i) 

params = range(1,5) 

n_cores = len(c.ids) 
for n,p in enumerate(params): 
    core = c.ids[n%n_cores] 
    calls.append(c[core].apply_async(throw_even, p)) 

#then you get the results 

while calls != []: 
    for c in calls: 
     try: 
      result = c.get(1e-3) 
      print(result[0]) 
      calls.remove(c) 
      #in the case your call failed, you can apply_async again. 
      # and append the call to calls. 
     except parallel.TimeoutError: 
      pass 
     except Exception as e: 
      knock_yourself_out(e) 
+1

並不總是你要在遠程引擎上引發異常,這是因爲你的代碼/數據發現了新的和有趣的方式來打破遠程引擎;)當你無法取回500多個結果時,這非常煩人因爲他們中有7人有棘手的數據。 – tacaswell 2013-11-15 04:21:57

+0

當然,爲每個參數創建一個不同的視圖應該保持異常封裝。 – 2013-11-18 10:06:53

0

解決這個偷偷摸摸的是達到到內部的AsyncMapResult的搶_result這是一個結果列表。這不直接幫助你,但只是事後:

tt = async_results._results 
fail_indx = [j for j, r in enumerate(tt) if isinstance(r, IPython.parallel.error.RemoteError)] 
good_indx = [j for j, r in enumerate(tt) if not isinstance(r, IPython.parallel.error.RemoteError)] 

just_the_results = [r for r in tt if not isinstance(r, IPython.parallel.error.RemoteError)]