2016-11-24 75 views
5

我有一個我想要並行執行的進程,但由於某些strange error,我失敗了。現在我正在考慮組合並計算主CPU上的失敗任務。但是我不知道如何爲.combine編寫這樣的函數。爲.combine創建一個函數在foreach

應該怎麼寫?

我知道如何把它們寫,例如this答案提供了一個例子,但它不提供如何與失敗的任務處理,既不重複主服務器上的任務。

我會做這樣的事情:

foreach(i=1:100, .combine = function(x, y){tryCatch(?)} %dopar% { 
    long_process_which_fails_randomly(i) 
} 

但是,我怎麼使用任務的輸入在.combine功能(如果是可以做到的)?或者我應該在%dopar%內提供返回一個標誌或一個列表來計算它?

回答

2

要執行組合函數中的任務,需要在foreach循環的主體返回的結果對象中包含額外的信息。在這種情況下,這將是一個錯誤標誌並且值爲i。有很多方法可以做到這一點,但這裏有一個例子:

comb <- function(results, x) { 
    i <- x$i 
    result <- x$result 
    if (x$error) { 
    cat(sprintf('master computing failed task %d\n', i)) 
    # Could call function repeatedly until it succeeds, 
    # but that could hang the master 
    result <- try(fails_randomly(i)) 
    } 
    results[i] <- list(result) # guard against a NULL result 
    results 
} 

r <- foreach(i=1:100, .combine='comb', 
      .init=vector('list', 100)) %dopar% { 
    tryCatch({ 
    list(error=FALSE, i=i, result=fails_randomly(i)) 
    }, 
    error=function(e) { 
    list(error=TRUE, i=i, result=e) 
    }) 
} 

我會忍不住通過執行並行循環反覆地處理這個問題,直到所有的任務都被計算:

x <- rnorm(100) 
results <- lapply(x, function(i) simpleError('')) 

# Might want to put a limit on the number of retries 
repeat { 
    ix <- which(sapply(results, function(x) inherits(x, 'error'))) 
    if (length(ix) == 0) 
    break 

    cat(sprintf('computing tasks %s\n', paste(ix, collapse=','))) 
    r <- foreach(i=x[ix], .errorhandling='pass') %dopar% { 
    fails_randomly(i) 
    } 

    results[ix] <- r 
} 

請注意,此解決方案使用.errorhandling選項,如果發生錯誤,該選項非常有用。有關此選項的更多信息,請參閱foreach手冊頁。

+0

感謝您的回答!你能解釋一下爲什麼你想重複並行循環,直到整個任務被計算出來爲止?你認爲這樣會更容易/更快,因爲每次錯誤都會在不同的迭代中發生? – Llopis

+0

@Llopis這一切都取決於你的特定問題。如果主人不能發生錯誤,你的方法可能會更好。如果主人很可能發生這種情況,那麼你最好還是使用工人,因爲這會更快。編寫容錯程序可能會非常棘手,所以考慮一些不同的方法會很有幫助。 –

+0

以及我不明白錯誤的起源,很難猜測,但感謝您提供替代方案 – Llopis