0
我有一個大的CSV一些社交媒體數據:保持數據幀的原始結構GROUPBY後PySpark
message_id, user_id, message, date
"1", "123", "some message blah blah", "Sun May 12 15:08:58 +0000 2013"
"2", "123", "another message blah", "Sun June 12 15:08:58 +0000 2013"
"3", "123", "i want this message removed", "Sun June 12 15:08:58 +0000 2013"
"4", "321", "more blah", "Mon June 12 15:08:58 +0000 2013"
,並希望在組內刪除基於一些標準的消息(在這個例子中的基團可爲user_id
這是我做了什麼:創建一個標準功能爲我的排除標準,基於該方法,然後應用功能分組數據定義的udf
:
def exclusion_criteria(data_list):
keep = []
for d in data_list:
if some_condition:
keep.append(d)
return keep
myUdf = udf(exclusion_criteria, ArrayType(StringType()))
msgsDF = session.read.csv("data.csv", header=False)
filterMsgsDF = msgsDF.groupBy("user_id").agg(collect_list("message")
.alias("message")).withColumn("message",myUdf("message"))
最後我得到的東西看起來像:
filterMsgsDF.take(1)
[Row(user_id='123', _c2=['some message blah blah', 'another message blah'])]
,但問題是,我丟棄與每個消息(message_id
和date
)相關的信息。我想到底是像
["1", "123", "some message blah blah", "Sun May 12 15:08:58 +0000 2013"]
["2", "123", "another message blah", "Sun June 12 15:08:58 +0000 2013"]
["4", "321", "more blah", "Mon June 12 15:08:58 +0000 2013"]
有沒有辦法加入這個其他信息或保持在GROUPBY/AGG一步?也許groupBy
不是最好的辦法嗎?
只需追加大約在同一列表消息的其他信息。 I.e messageId,消息,日期將是單個列表。 – StackPointer
對不起,我沒有關注。我在哪裏添加其他信息? – Sal
[行(user_id ='123',_c2 = [[123,'some message blah blah',date],[124,'another message blah',date]])] – StackPointer