2016-02-13 122 views
1

比方說,我有一個數據集採用以下形式減少組合的數量:星火 - 使用GROUPBY

data = sc.parallelize([('customer_1', 'contract_1', 15000, 100), 
         ('customer_1', 'contract_1', 20000, 200), 
         ('customer_2', 'contract_2', 30000, 100), 
         ('customer_1', 'contract_1', 7500, 500)], 2) 

其中:

  • 第一列代表一個客戶ID。
  • 第二欄代表一個合同ID。
  • 第三列代表一個時間戳。
  • 第四欄是合同價值。

我需要做的是添加一個額外的列,對於每一行,包含具有相同客戶ID,相同合同ID和時間戳等於或大於時間戳的所有行的合同值總和的當前行。

所以,以前的數據集,結果應該是:

customer_1 contract_1 15000 300 # 300 = 100+200 
customer_1 contract_1 20000 200 # 200 
customer_2 contract_2 30000 100 # 100 
customer_1 contract_1 7500 800 # 800 = 100+200+500 

如果時間戳檢查不會在那裏,它可以做設置由客戶ID和合同ID組成鍵,通過鍵和減少然後一個連接,但給出時間戳比較存在,我沒有找到一個簡單的方法來做到這一點。

我獲得此做以這種方式使用笛卡爾操作第一種方法:

combinations = data.cartesian(data) 
       .filter(lambda a: a[0][0] == a[1][0] and 
           a[0][1] == a[1][1] and 
           a[1][2] >= a[0][2]) 
agg = combinations.map(lambda a: (a[0], a[1][3])).reduceByKey(lambda x,y: x+y) 

結果是正常,但恐怕將笛卡爾到的數據,我管理(大於1的量百萬行)效率很低。實際上,在這裏應用笛卡爾操作會產生許多根本沒有意義的組合(根據定義,將不同客戶或合同的行組合在一起並不合理),這些組合隨後會被過濾器刪除。

理想的情況下,我會做一個groupBy使用客戶ID和合同ID作爲關鍵,然後,遍歷產生的groupBy,並將笛卡爾產品應用到每一行。這將大大減少生成的組合數量。但是,我沒有找到任何方法來做到這一點。更甚者,這可能嗎?如果是這樣,怎麼樣?你有關於如何實現我的要求的任何其他建議/想法?

感謝您的幫助!

回答

1

這是一個窗函數問一個問題:

import sys 
from pyspark.sql.window import Window 
from pyspark.sql.functions import sum 

df = data.toDF(["customer_id", "contract_id", "timestamp", "value"]) 
w = (Window() 
    .partitionBy("customer_id", "contract_id") 
    .orderBy("timestamp") 
    # Current row and future values 
    .rangeBetween(0, sys.maxsize)) # or .rowsBetween(0, sys.maxsize) 

result = df.withColumn("future_value", sum("value").over(w)) 
result.show() 

## +-----------+-----------+---------+-----+------------+ 
## |customer_id|contract_id|timestamp|value|future_value| 
## +-----------+-----------+---------+-----+------------+ 
## | customer_1| contract_1|  7500| 500|   800| 
## | customer_1| contract_1| 15000| 100|   300| 
## | customer_1| contract_1| 20000| 200|   200| 
## | customer_2| contract_2| 30000| 100|   100| 
## +-----------+-----------+---------+-----+------------+ 
+0

非常感謝您zero323。我從Spark開始並且不知道Window函數。感謝您的信息。出於好奇,還有一個問題:如果我有timestamp_start和timestamp_end字段,並且我的總結條件是current_row_timestamp_start> = timestamp_start和current_row_timestamp_end <= timestamp_end?我是否也可以使用Window函數來處理這種情況?謝謝! –

+0

不可以。您可以提供覆蓋行或值的靜態範圍,但不能取決於當前行。從理論上講,你可以用滯後/領先來回顧和前瞻,並嘗試從中建立一些東西,但它不可能是漂亮或高效的。 – zero323