在Spark的Java/Scala/Python實現中,可以簡單地調用或DataFrame
類型的foreach
方法來並行化數據集上的迭代。SparkR foreach循環
在SparkR中我找不到這樣的指令。遍歷DataFrame
的行的正確方法是什麼?
我只能找到gapply
和dapply
函數,但我不想計算新的列值,我只是想通過從列表中取一個元素並行執行某些操作。
我以前的嘗試是與lapply
inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
createOrReplaceTempView(inputDF,'inputData')
distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData')
collected <- collect(distinctM)[[1]]
problemSolver <- function(idM) {
filteredDF <- filter(inputDF, inputDF$ID_M == idM)
}
spark.lapply(c(collected), problemSolver)
,但我得到這個錯誤:
Error in handleErrors(returnStatus, conn) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
Error in callJMethod([email protected], "col", c) :
Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed.
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod
什麼會被R提供的解決方案來解決這樣的問題?