2017-07-19 43 views
2

我用foreach + doParallel在R的功能應用到各行的矩陣的multithreadedly當矩陣有很多行,之前以及在迭代multithreadedly會後foreach需要很長的時間。如何減少多線程迭代之前和之後的foreach時間?

例如,如果我運行:

library(foreach) 
library(doParallel) 

doWork <- function(data) { 

    # setup parallel backend to use many processors 
    cores=detectCores() 
    number_of_cores_to_use = cores[1]-1 # not to overload the computer 
    cat(paste('number_of_cores_to_use:',number_of_cores_to_use)) 
    cl <- makeCluster(number_of_cores_to_use) 
    clusterExport(cl=cl, varlist=c('ns','weights')) 
    registerDoParallel(cl) 
    cat('...Starting foreach initialization') 

    output <- foreach(i=1:length(data[,1]), .combine=rbind) %dopar% { 
    cat(i) 
    y = data[i,5] 
    a = 100 
    for (i in 1:3) { # Useless busy work 
     b=matrix(runif(a*a), nrow = a, ncol=a) 
    } 
    return(runif(10)) 

    } 
    # stop cluster 
    cat('...Stop cluster') 
    stopCluster(cl) 

    return(output) 
} 

r = 100000 
c = 10 
data = matrix(runif(r*c), nrow = r, ncol=c) 
output = doWork(data) 
output[1:10,] 

CPU使用率是如下(100%是指所有核都得到充分利用):

enter image description here

與註釋:

enter image description here

我該如何選擇使代碼變得簡單,這樣foreach在多線程迭代之前和之後都不需要很長時間?主要的時間點是花費的時間。花費的時間隨着foreach迭代次數的增加而顯着增加,有時會使代碼慢,就好像使用簡單的for循環一樣。


另一個例子(假設lmpoly不能採取矩陣作爲參數):

library(foreach) 
library(doParallel) 

doWork <- function(data,weights) { 

    # setup parallel backend to use many processors 
    cores=detectCores() 
    number_of_cores_to_use = cores[1]-1 # not to overload the computer 
    cat(paste('number_of_cores_to_use:',number_of_cores_to_use)) 
    cl <- makeCluster(number_of_cores_to_use) 
    clusterExport(cl=cl, varlist=c('weights')) 
    registerDoParallel(cl) 
    cat('...Starting foreach initialization') 

    output <- foreach(i=1:nrow(data), .combine=rbind) %dopar% { 
    x = sort(data[i,]) 
    fit = lm(x[1:(length(x)-1)] ~ poly(x[-1], degree = 2,raw=TRUE), na.action=na.omit, weights=weights) 
    return(fit$coef) 
    } 
    # stop cluster 
    cat('...Stop cluster') 
    stopCluster(cl) 

    return(output) 
} 

r = 10000 
c = 10 
weights=runif(c-1) 
data = matrix(runif(r*c), nrow = r, ncol=c) 
output = doWork(data,weights) 
output[1:10,] 
+3

問題出在'rbind' I認爲。 'rbind'列表中的很多值需要很長時間。另外,填充行很糟糕,因爲矩陣是按列存儲的。另外,製作一個long foreach循環效率不高(使用塊代替)。最後,在矩陣上進行並行化時,使用共享內存總是更好。如果您製作的示例與您想要的更接近,我可以爲您制定解決方案。 –

+1

問題在於認爲''foreach%dopar%'*總是比矢量化方法快。由於您通過'i = 1:length(data [,1])'和'data'具有100,000行迭代,因此'foreach%dopar%'需要在100,000個工作者實例之間進行通信。在實施並行化方法之前,您應該在矢量化方法(sapply,lapply,apply)和並行化方法之間進行基準測試。如果你想堅持一種並行化的方法,把你的代碼改爲在列(其中有10個)而不是行上工作。更好的辦法是減少每個工作人員的工作量。 – CPak

+0

@F.Privé謝謝,我會非常感興趣。我添加了一個與我想要的更接近的例子。 –

回答

1

試試這個:

devtools::install_github("privefl/bigstatsr") 
library(bigstatsr) 
options(bigstatsr.ncores.max = parallel::detectCores()) 

doWork2 <- function(data, weights, ncores = parallel::detectCores() - 1) { 

    big_parallelize(data, p.FUN = function(X.desc, ind, weights) { 

    X <- bigstatsr::attach.BM(X.desc) 

    output.part <- matrix(0, 3, length(ind)) 
    for (i in seq_along(ind)) { 
     x <- sort(X[, ind[i]]) 
     fit <- lm(x[1:(length(x)-1)] ~ poly(x[-1], degree = 2, raw = TRUE), 
       na.action = na.omit, weights = weights) 
     output.part[, i] <- fit$coef 
    } 

    t(output.part) 
    }, p.combine = "rbind", ncores = ncores, weights = weights) 
} 

system.time({ 
    data.bm <- as.big.matrix(t(data)) 
    output2 <- doWork2(data.bm, weights) 
}) 

all.equal(output, output2, check.attributes = FALSE) 

這是快兩倍,在我的電腦(其中有隻有4個核心)。備註:

  • 使用超過一半的核心往往是無用的。
  • 您的數據不是很大,所以在這裏使用big.matrix可能沒有什麼用。
  • big_parallelizencores列中的矩陣分開,並在每個列上應用您的函數,然後合併結果。
  • 在函數中,最好在循環前做出輸出,然後填充它比使用foreachrbind的所有結果。
  • 我只訪問列,而不是行。

所以這些都是很好的做法,但它與您的數據並不相關。使用更多內核和更大的數據集時,增益應該更高。

基本上,如果你想超快速,重新實現Rcpp中的lm部分將是一個很好的解決方案。

0

正如F.Privé的註釋中:

問題是與rbind我想。從列表中刪除大量值需要很長時間。另外,填充行很糟糕,因爲矩陣是按列存儲的。另外,製作一個long foreach循環效率不高(使用塊代替)。

要使用使用的塊,而不是(如果使用5個核心,每個核心接收矩陣的20%):

library(foreach) 
library(doParallel) 


array_split <- function(data, number_of_chunks) { 
    # [Partition matrix into N equally-sized chunks with R](https://stackoverflow.com/a/45198299/395857) 
    # Author: lmo 
    rowIdx <- seq_len(nrow(data)) 
    lapply(split(rowIdx, cut(rowIdx, pretty(rowIdx, number_of_chunks))), function(x) data[x, ]) 
} 


doWork <- function(data) { 

    # setup parallel backend to use many processors 
    cores=detectCores() 
    number_of_cores_to_use = cores[1]-1 # not to overload the computer 
    cat(paste('number_of_cores_to_use:',number_of_cores_to_use)) 
    cl <- makeCluster(number_of_cores_to_use) 
    clusterExport(cl=cl, varlist=c('ns','weights')) 
    registerDoParallel(cl) 

    cat('...Starting array split') 
    number_of_chunks = number_of_cores_to_use 
    data_chunks = array_split(data=data, number_of_chunks=number_of_chunks) 
    degree_poly = 2 

    cat('...Starting foreach initialization') 
    output <- foreach(i=1:length(data_chunks), .combine=rbind) %dopar% { 

    data_temporary = data_chunks[[i]] 
    output_temporary = matrix(0, nrow=nrow(data_temporary), ncol = degree_poly + 1) 
    for(i in 1:length(data_temporary[,1])) { 
     x = sort(data_temporary[i,]) 
     fit = lm(x[1:(length(x)-1)] ~ poly(x[-1], degree = degree_poly,raw=TRUE), na.action=na.omit, weights=weights) 
     output_temporary[i,] = fit$coef 
    } 
    return(output_temporary) 
    } 

    # stop cluster 
    cat('...Stop cluster') 
    stopCluster(cl) 

    return(output) 
} 

r = 100000 
c = 10 
weights=runif(c-1) 
data = matrix(runif(r*c), nrow = r, ncol=c) 
output = doWork(data) 
output[1:10,] 

供參考: