2017-01-23 181 views
2

在Spark的Java/Scala/Python實現中,可以簡單地調用或DataFrame類型的foreach方法來並行化數據集上的迭代。SparkR foreach循環

在SparkR中我找不到這樣的指令。遍歷DataFrame的行的正確方法是什麼?

我只能找到gapplydapply函數,但我不想計算新的列值,我只是想通過從列表中取一個元素並行執行某些操作。

我以前的嘗試是與lapply

inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "") 
createOrReplaceTempView(inputDF,'inputData') 

distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData') 

collected <- collect(distinctM)[[1]] 

problemSolver <- function(idM) { 
    filteredDF <- filter(inputDF, inputDF$ID_M == idM) 
} 

spark.lapply(c(collected), problemSolver) 

,但我得到這個錯誤:

Error in handleErrors(returnStatus, conn) : 
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with 
Error in callJMethod([email protected], "col", c) : 
    Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed. 
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod 

什麼會被R提供的解決方案來解決這樣的問題?

回答

3

我也有類似的問題。收集DataFrame將其作爲數據框放入R中。從那裏,你可以像往常一樣在每一行中看到每一行。在我看來,這是處理數據的一個可怕主題,因爲你失去了Spark提供的並行處理。而不是收集數據,然後過濾,使用內置的SparkR功能,select,filter等。如果你想做行方式的操作,內置的SparkR函數通常會爲你做這件事,否則,我發現selectExpr或是很有用當原始Spark函數被設計爲工作在單個值上時認爲:from_unix_timestamp)

因此,要得到你想要的,我會嘗試這樣的事情(我在SparkR 2.0+)什麼:

弗里斯特讀入的數據,你已經做了:

inputDF<- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "") 

然後讓這個RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)

下,僅分離的獨特/唯一值(我使用管道magrittr(工作在SparkR)):

distinctSparkDF<- SparkR::select(inputSparkDF) %>% SparkR::distinct() 

在這裏,您可以應用過濾,而仍然生活在星火的世界:

filteredSparkDF<- SparkR::filter(distinctSparkDF, distinctSparkDF$variable == "value")

星火已經過濾後,對你的數據,這是有道理的收集子集分爲基礎R爲最後一步的工作流程:

myRegularRDataframe<- SparkR::collect(filteredSparkDF)

我希望這有助於。祝你好運。 - 華麗