2017-07-30 97 views
6

我有以下數據框,我的意圖是找到所有的ID,具有不同的用法,但相同的類型。multidplyr和group_by()和過濾器()

ID <- rep(1:4, each=3) 
USAGE <- c("private","private","private","private", 
"taxi","private","taxi","taxi","taxi","taxi","private","taxi") 
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW") 
df <- data.frame(ID,USAGE,TYPE) 

如果我運行

df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1) 

我得到預期的結果。但我的原始數據幀有> 200萬行。所以我想用我所有的核心來運行這個操作。

我想這個代碼multidplyr:

f1 <- partition(df, ID) 
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1) 
f3 <- collect(f2) 

但隨後出現以下消息:

Warning message: group_indices_.grouped_df ignores extra arguments 

f1 <- partition(df, ID) 

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
    4 nodes produced errors; first error: Evaluation error: object 'f1' not found. 

f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1) 

會是什麼來實現整個操作進入multidplyr正確的方法後?非常感謝。

回答

2

您應該在撥打電話partition()時包含所有分組變量。這樣每個核心都具有爲給定組執行計算所需的所有數據。

library(tidyverse) 
library(multidplyr) 

fast <- df %>% 
    partition(ID, TYPE) %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) %>% 
    collect() 

驗證

你仍然可以得到約group_indices警告,但結果是一樣的原始dplyr方法。

slow <- df %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) 

fast == slow 
     ID USAGE TYPE 
#[1,] TRUE TRUE TRUE 
#[2,] TRUE TRUE TRUE 
#[3,] TRUE TRUE TRUE 

標杆

現在最大的問題:是不是更快嗎?定義cluster可以確保我們使用所有內核。

library(microbenchmark) 
library(parallel) 

cluster <- create_cluster(cores = detectCores()) 

fast_func <- function(df) { 
    df %>% 
    partition(ID, TYPE, cluster = cluster) %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) %>% 
    collect() 
} 

slow_func <- function(df) { 
    slow <- df %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) 
} 

microbenchmark(fast_func(df), slow_func(df)) 
# Unit: milliseconds 
# expr  min  lq  mean median  uq  max neval cld 
# fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045 100 b 
# slow_func(df) 4.717761 6.974897 9.333049 7.796686 8.468594 49.51916 100 a 

使用並行處理實際上是在這種情況下慢fast_func的中值運行需要56毫秒而不是9個。這是因爲管理跨羣集數據流的開銷。但是你說你的數據有數百萬行,所以我們來試試。

# Embiggen the data 
df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df() 

microbenchmark(fast_func(df), slow_func(df)) 
# Unit: seconds 
# expr  min  lq  mean median  uq  max neval cld 
# fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095 10 b 
# slow_func(df) 1.741674 2.550008 3.529607 3.246665 3.983452 7.214484 10 a 

對於巨型數據集,fast_func仍然比較慢!有時並行運行可以節省大量時間,但簡單的分組過濾器並不一定是其中之一。