0

我使用的是Spark 2.0.0和數據框。 這裏是我的輸入數據幀作爲將GroupBy對象轉換爲Pyspark中的有序列表

| id | year  | qty | 
|----|-------------|--------| 
| a | 2012  | 10  | 
| b | 2012  | 12  | 
| c | 2013  | 5  | 
| b | 2014  | 7  | 
| c | 2012  | 3  | 

我要的是

| id | year_2012 | year_2013 | year_2014 | 
|----|-----------|-----------|-----------| 
| a | 10  | 0   | 0   | 
| b | 12  | 0   | 7   | 
| c | 3   | 5   | 0   | 

| id | yearly_qty | 
|----|---------------| 
| a | [10, 0, 0] | 
| b | [12, 0, 7] | 
| c | [3, 5, 0]  | 

我發現的最接近的解決方案是collect_list(),但此功能不會爲提供訂單名單。在我的腦海解決方案應該是這樣的:

data.groupBy('id').agg(collect_function) 

有沒有辦法不進行過濾每一個id列使用循環產生呢?

回答

3

第一個可使用pivot可以容易地實現:

from itertools import chain 

years = sorted(chain(*df.select("year").distinct().collect())) 
df.groupBy("id").pivot("year", years).sum("qty") 

其可進一步轉化爲陣列形式:

from pyspark.sql.functions import array, col 

(... 
    .na.fill(0) 
    .select("id", array(*[col(str(x)) for x in years]).alias("yearly_qty"))) 

直接獲取第二個是可能不值得因爲所有忙亂你必須先填補空白。不過你可以嘗試:

from pyspark.sql.functions import collect_list, struct, sort_array, broadcast 
years_df = sc.parallelize([(x,) for x in years], 1).toDF(["year"]) 

(broadcast(years_df) 
    .join(df.select("id").distinct()) 
    .join(df, ["year", "id"], "leftouter") 
    .na.fill(0) 
    .groupBy("id") 
    .agg(sort_array(collect_list(struct("year", "qty"))).qty.alias("qty"))) 

它還要求星火2.0+,以獲得struct收集支持。

這兩種方法都非常昂貴,所以在使用這些方法時應該小心。作爲一個經驗法則,long比wide更好。

+0

謝謝,'pivot'就是我要找的! – CodeMySky

+0

'struct'收藏在pyspark你的意思是? – eliasah

+0

Spark中的@eliash'collect_ *'不支持Spark <2.0中的原子。 – zero323