0

我想循環遍歷Spark程序中的數據幀列並計算最小值和最大值。 我是Spark和Scala的新手,一旦我在數據框中獲取它,就無法迭代列。在火花數據框中遍歷列並計算最小最大值

我已經嘗試運行下面的代碼,但它需要傳遞給它的列號,問題是我如何從數據框中獲取它並動態傳遞它並將結果存儲在集合中。

val parquetRDD = spark.read.parquet("filename.parquet") 

parquetRDD.collect.foreach ({ i => parquetRDD_subset.agg(max(parquetRDD(parquetRDD.columns(2))), min(parquetRDD(parquetRDD.columns(2)))).show()}) 

感謝這方面的幫助。

回答

1

您不應該迭代行或記錄。您應該使用匯總功能

import org.apache.spark.sql.functions._ 
val df = spark.read.parquet("filename.parquet") 
val aggCol = col(df.columns(2)) 
df.agg(min(aggCol), max(aggCol)).show() 

首先,當您執行spark.read.parquet時,您正在讀取數據幀。 接下來我們使用col函數定義我們想要使用的列。 col函數將列名轉換爲列。您可以改爲使用df(「name」),其中name是該列的名稱。

agg函數使用聚合列,因此min和max是聚合函數,它們獲取列並返回具有聚合值的列。

更新

根據該意見,目的是最大和最小的所有列。因此,你可以這樣做:

val minColumns = df.columns.map(name => min(col(name))) 
val maxColumns = df.columns.map(name => max(col(name))) 
val allMinMax = minColumns ++ maxColumns 
df.agg(allMinMax.head, allMinMax.tail: _*).show() 

你也可以簡單地做:

df.describe().show() 

,讓你的所有列,包括最小值,最大值,平均值,計數和STDDEV

+0

感謝阿薩夫統計您的響應。但是在val中,aggCol = col(df.columns(2))實際上並沒有手動傳遞列號(在這種情況下爲2)。有沒有一種方法可以動態地傳遞它,以便我可以在循環中逐個遍歷列並生成最小最大值。謝謝。 – sabby

+0

謝謝阿薩夫!它確實有幫助,但是這可以寫成循環,以便我不需要手動傳遞列名。當我說我的問題迭代時,我的意思是逐個循環遍歷列。 在下面的例子中,我們有三列,我會動態地選擇每一列,計算它的最小最大值,而不需要 必須手動傳遞列名。假定col1,col2,col3的任意隨機值爲 col1 col2 col3 – sabby