2011-06-06 47 views
6

我有一個data.frame的單元格,值和座標。它駐留在全球環境中。並行運行時寫入全局環境

> head(cont.values) 
    cell value x y 
1 11117 NA -34 322 
2 11118 NA -30 322 
3 11119 NA -26 322 
4 11120 NA -22 322 
5 11121 NA -18 322 
6 11122 NA -14 322 

因爲我的自定義函數需要近一個第二計算單個細胞(和我有細胞數以萬計的計算),我不想重複那些已經有一個值單元格計算。我的以下解決方案試圖避免這一點。每個單元格可以獨立計算,尖叫並行執行。

我的功能實際上是做的是檢查是否有一個指定的單元格號碼的值,如果它是NA,它會計算它並插入它的位置NA。

我可以使用申請家庭的功能和apply內,我可以讀取和寫入cont.values沒有問題(這是在全球環境中)運行我的神奇功能(結果是value了相應cell)。

現在,我想並行運行這個(使用snowfall),我無法讀取或寫入/從這個​​變量從個別核心。

問題:當並行執行一個函數時,什麼解決方案可以從worker/core中的全局環境中讀取/寫入/寫入全局環境中的動態變量。有沒有更好的方法來做到這一點?

回答

4

中央存儲的工人的價值協商的模式在rredis包CRAN上實現。這個想法是,Redis服務器維護一系列鍵值對(您的全局數據框,重新實現)。工人向服務器查詢是否計算了該值(redisGet),如果不計算並存儲(redisSet),以便其他工人可以重新使用它。工人可以是R腳本,所以擴展勞動力很容易。這是一個非常好的替代平行範例。以下是一個使用「記憶」每個結果的概念的示例。我們有一個功能,就是慢(休眠第二)

fun <- function(x) { Sys.sleep(1); x } 

我們寫一個「memoizer」返回的fun一個變種,首先檢查是否爲x值已經計算出,如果是這樣使用該

memoize <- 
    function(FUN) 
{ 
    force(FUN) # circumvent lazy evaluation 
    require(rredis) 
    redisConnect() 
    function(x) 
    { 
     key <- as.character(x) 
     val <- redisGet(key) 
     if (is.null(val)) { 
      val <- FUN(x) 
      redisSet(key, val) 
     } 
     val 
    } 
} 

然後我們memoize的我們的函數

funmem <- memoize(fun) 

> system.time(res <- funmem(10)); res 
    user system elapsed 
    0.003 0.000 1.082 
[1] 10 
> system.time(res <- funmem(10)); res 
    user system elapsed 
    0.001 0.001 0.040 
[1] 10 

這確實需要在R之外運行的Redis服務器,但安裝非常簡單;請參閱rredis軟件包隨附的文檔。

A內-R並行版本可能

library(snow) 
cl <- makeCluster(c("localhost","localhost"), type = "SOCK") 
clusterEvalQ(cl, { require(rredis); redisConnect() }) 
tasks <- sample(1:5, 100, TRUE) 
system.time(res <- parSapply(cl, tasks, funmem)) 
+0

我可以實現這一些日子,但我目前官方沒有訪問POSIX型系統(卡在Windows上),這意味着我可以」還沒有運行服務器。 – 2011-06-07 08:53:32

4

這將取決於問題的功能是什麼,當然,但恐怕snowfall不會有很大的幫助。事情是,您必須將整個數據幀導出到不同的核心(請參閱?sfExport),並仍然找到合併它的方法。這種做法違背了改變全球環境價值的全部目的,因爲您可能希望儘可能降低內存使用量。

您可以深入到snow的低級功能中以獲得此功能。請參閱以下示例:

#Some data 
Data <- data.frame(
    cell = 1:10, 
    value = sample(c(100,NA),10,TRUE), 
    x = 1:10, 
    y = 1:10 
) 
# A sample function 
sample.func <- function(){ 
    id <- which(is.na(Data$value)) # get the NA values 

    # this splits up the values from the dataframe in a list 
    # which will be passed to clusterApply later on. 
    parts <- lapply(clusterSplit(cl,id),function(i)Data[i,c("x","y")]) 

    # Here happens the magic 
    Data$value[id] <<- 
    unlist(clusterApply(cl,parts,function(x){ 
     x$x+x$y 
     } 
    )) 
} 
#now we run it 
require(snow) 
cl <- makeCluster(c("localhost","localhost"), type = "SOCK") 
sample.func() 
stopCluster(cl) 
> Data 
    cell value x y 
1  1 100 1 1 
2  2 100 2 2 
3  3  6 3 3 
4  4  8 4 4 
5  5 10 5 5 
6  6 12 6 6 
7  7 100 7 7 
8  8 100 8 8 
9  9 18 9 9 
10 10 20 10 10 

您仍然必須複製(部分)您的數據,以便將其傳送到核心。但是,無論如何,當您在數據幀上調用snowfall高級函數時,無論如何,snowfall總是會使用snow的低級函數。

另外,人們不應該忘記,如果您更改數據框中的一個值,則整個數據框也會複製到內存中。所以當他們從羣集中回來時,通過逐個添加值,您不會贏得那麼多。你可能想嘗試一些不同的方法,並做一些內存分析。

1

我同意Joris您需要將您的數據複製到其他內核。從積極的角度來看,您不必擔心NA在覈心中是否在數據中。 如果你原來data.frame被稱爲cont.values

nnaidx<-is.na(cont.values$value) #where is missing data originally 
dfrnna<-cont.values[nnaidx,] #subset for copying to other cores 
calcValForDfrRow<-function(dfrRow){return(dfrRow$x+dfrRow$y)}#or whatever pleases you 
sfExport(dfrnna, calcValForDfrRow) #export what is needed to other cores 
cont.values$value[nnaidx]<-sfSapply(seq(dim(dfrnna)[1]), function(i){calcValForDfrRow(dfrnna[i,])}) #sfSapply handles 'reordering', so works exactly as if you had called sapply 

應該很好地工作(不包括錯別字)