2017-08-09 174 views
0

從下表中我們可以看到,用戶正在觀看電視頻道。在火花SQL我想創建一個名爲「CHANGE_SEQUENCE」欄,其中,所述邏輯是Spark SQL爲每個子組分配一個序列號

CASE WHEN ROW_NUMBER = 1 
THEN 1 
ELSE 
    CASE WHEN CHANNEL_CHANGED = true 
     THEN ((lag(CHANGE_SEQUENCE,1, CHANGE_SEQUENCE) over(partition by CUSTOMER_ID order by ROW_NUMBER)) + 1) 
     ELSE (lag(CHANGE_SEQUENCE,1, CHANGE_SEQUENCE) over(partition by CUSTOMER_ID order by ROW_NUMBER)) 
     END 
    END 
AS CHANGE_SEQUENCE 

Input 
CUSTOMER_ID | TV_CHANNEL_ID | PREV_CHANNEL_ID | ROW_NUMBER | CHANNEL_CHANGED 
1 | 100 | NULL | 1 | FALSE 
1 | 100 | 100 | 2 | FALSE 
1 | 100 | 100 | 3 | FALSE 
1 | 200 | 100 | 4 | TRUE 
1 | 200 | 200 | 5 | FALSE 
1 | 200 | 200 | 6 | FALSE 
1 | 300 | 200 | 7 | TRUE 
1 | 300 | 300 | 8 | FALSE 
1 | 300 | 300 | 9 | FALSE 

Output 
CUSTOMER_ID | TV_CHANNEL_ID | PREV_CHANNEL_ID | ROW_NUMBER | CHANNEL_CHANGED | CHANGE_SEQUENCE 
1 | 100 | NULL | 1 | FALSE | 1 
1 | 100 | 100 | 2 | FALSE | 1 
1 | 100 | 100 | 3 | FALSE | 1 
1 | 200 | 100 | 4 | TRUE | 2 
1 | 200 | 200 | 5 | FALSE | 2 
1 | 200 | 200 | 6 | FALSE | 2 
1 | 300 | 200 | 7 | TRUE | 3 
1 | 300 | 300 | 8 | FALSE | 3 
1 | 300 | 300 | 9 | FALSE | 3 

由於第一3個記錄,客戶觀看信道100中,它的一組,像明智的通道200和300

請指教。

回答

1

在pyspark:

from pyspark.sql import functions as f 

cfg = SparkConf().setAppName('s') 
spark = SparkSession.builder.config(conf=cfg).getOrCreate() 

df = spark.read.csv('...') 

df_p1 = df.filter((df['ROW_NUMBER'] == '1') | (df['CHANNEL_CHANGED'] == 'TRUE')) \ 
    .withColumn('CHANGE_SEQUENCE', f.row_number().over(Window().partitionBy('CUSTOMER_ID').orderBy('ROW_NUMBER'))) 

df_p2 = df.filter((df['ROW_NUMBER'] != '1') & (df['CHANNEL_CHANGED'] == 'FALSE')).withColumn('CHANGE_SEQUENCE',f.lit(None)) 

df = df_p1.union(df_p2)\ 
    .withColumn('CHANGE_SEQUENCE', f.collect_set('CHANGE_SEQUENCE').over(Window().partitionBy('CUSTOMER_ID').orderBy('ROW_NUMBER')))\ 
    .withColumn('CHANGE_SEQUENCE', f.UserDefinedFunction(lambda x: x.pop(), IntegerType())('CHANGE_SEQUENCE')) 
相關問題