2017-01-22 70 views
0

我正在使用以下格式的大數據處理PySpark作業。在PySpark中計算重複值在一段時間內的值

ID-1234567 iplong agent partner client country timestamp category reference 

我需要找到基於每個合作伙伴1分鐘的時間間隔內列2(iplong), 3(agent), 5(client), 6(country), 9(reference)的重複記錄的平均量。

我明白,我需要

  1. 鴻溝記錄到一分鐘的時間間隔。
  2. 地圖一切由partner
  3. 集團一切由partner
  4. 通過的總記錄數並通過不同的記錄數減少每個間隔,並採取差異來獲得的重複記錄量(還需要定義一個函數來比較兩個僅記錄2(iplong), 3(agent), 5(client), 6(country), 9(reference)列的值。)
  5. 將所有間隔及其重複計數加在一起的所有partner。除以它們的外觀數量。

我明白這個過程,但不是pyspark的具體實現。

有人可以幫助我在pyspark執行任何上述步驟。

樣本數據:

9794474 1000460030 Samsung_S5233 dv4gs dswae in 2012-03-08 00:00:00 mg riflql2a0yv8xoa9sq0recx4x 
9794471 3386480130 Nokia_C3-00 duq7h dr75h py 2012-03-08 00:00:00 co 
9794468 1907980030 Nokia_5233 dv6i3 ds3xq vn 2012-03-08 00:00:00 es gp53lqr9njqd6z2ap5d364sip 
9794467 1791990020 MAUI duxto dvb8g in 2012-03-08 00:00:00 ad 
9794466 1791000060 Nokia_3110c dusg4 dvb8g in 2012-03-08 00:00:00 ad 
9794477 1353590020 Blackberry_9300 du6dt dtr0u es 2012-03-08 00:00:00 es h5njsswvxorsau9u8fxh0e9se 
9794478 1402290050 NokiaC6-01.3 dusnc dsgcn ru 2012-03-08 00:00:00 mc 
9794481 1848749950 Nokia_C3-00 dvry3 dr6sg th 2012-03-08 00:00:01 mc oj0rekb51pvirnjuqjt10zn4b 

更新:

到目前爲止,我已經試過把整個數據到MySQL,並從中讀取。但是在閱讀操作中需要花費太多時間。

對於mapreduce方法,我嘗試了不同的小東西。但不明白我將如何在代碼中進一步處理它。因此,無法用一種方法前進。

clicks_rdd = sc.parallelize(list(clicks_reader)[1:]) 
minwise_clicks = clicks_rdd.groupby(clicks_rdd.index.map(lambda t: t.minute)) # Didn't work 
clicks_mapped_publishers = clicks_rdd.map(lambda x : (x.pop(3), x)) # Works fine but need the records divided into minute intervals first. 

也嘗試了一些其他的東西在這裏和那裏。但沒有什麼可靠的

以下是我的原始數據集文件的前25條記錄。

id,iplong,agent,partnerid,cid,cntr,timeat,category,referer 
9794476,1071324855,SonyEricsson_K70,dv3va,dsfag,us,2012-03-08 00:00:00.0,ad, 
9794474,1000461055,Samsung_S5233,dv4gs,dswae,in,2012-03-08 00:00:00.0,mg,riflql2a0yv8xoa9sq0recx4x 
9794471,3386484265,Nokia_C3-00,duq7h,dr75h,py,2012-03-08 00:00:00.0,co, 
9794468,1907981997,Nokia_5233,dv6i3,ds3xq,vn,2012-03-08 00:00:00.0,es,gp53lqr9njqd6z2ap5d364sip 
9794467,1791989091,MAUI,duxto,dvb8g,in,2012-03-08 00:00:00.0,ad, 
9794466,1791002478,Nokia_3110c,dusg4,dvb8g,in,2012-03-08 00:00:00.0,ad, 
9794477,1353590316,Blackberry_9300,du6dt,dtr0u,es,2012-03-08 00:00:00.0,es,h5njsswvxorsau9u8fxh0e9se 
9794478,1402285217,NokiaC6-01.3,dusnc,dsgcn,ru,2012-03-08 00:00:00.0,mc, 
9794481,1848747204,Nokia_C3-00,dvry3,dr6sg,th,2012-03-08 00:00:01.0,mc,oj0rekb51pvirnjuqjt10zn4b 
9794482,1893182670,NokiaC2-03,du77a,dr6x2,id,2012-03-08 00:00:01.0,co,r63f8uhijvr2irvka3glwyb38 
9794483,1912930086,MAUI,dvwdj,dvb8g,id,2012-03-08 00:00:01.0,ad, 
9794485,2098816838,GT-S5360B,dvjtq,dr72e,th,2012-03-08 00:00:01.0,co, 
9794486,3309473440,MAUI,dv6i3,ds3k0,za,2012-03-08 00:00:01.0,es, 
9794492,702295934,Nokia_9300,dv6i3,dtqrw,ng,2012-03-08 00:00:01.0,es,onbw7na2mi8a62g4p6y3av2qt 
9794493,694135362,Nokia_N95,dupgf,dvb8g,sd,2012-03-08 00:00:01.0,ad,hoq05psulkszxm4izlql4g962 
9794495,1791428359,Samsung_S8300,dvpo7,dvb8g,in,2012-03-08 00:00:02.0,co,im387req0zp1ucygamhgadgtm 
9794496,1783607271,GT-S5570,du56s,dsgq2,in,2012-03-08 00:00:02.0,mc,immfap8948rebeym8ri0vf5cr 
9794498,1860189232,Samsung_GT-B3313,du56s,ds22r,in,2012-03-08 00:00:02.0,mc,r81nrzjemr5jrfvjjeoxmdm4y 
9794499,1868310973,Nokia_2730c,dv3va,drvnr,au,2012-03-08 00:00:02.0,ad, 
9794500,1893182511,Nokia_5233,dv6i7,dr6tn,id,2012-03-08 00:00:02.0,co,tq09jycwii12iul7hzalucue3 
9794501,1884230403,Samsung_GT-S3653,dvjil,ds92x,in,2012-03-08 00:00:02.0,mc,h0z1j3bwiverubvwg851e9eon 
9794503,1945382244,GT-S5360,dvijt,dsgq2,in,2012-03-08 00:00:02.0,mc,fbbenjzmoe0oc7x4e2080nj8x 
9794508,2928534854,Samsung_R310,dunsq,dsg3q,us,2012-03-08 00:00:02.0,ad,kl9j183hop90uwq2p82iidjsb 
9794510,3063717709,Samsung_GT-S3653,dvjjf,dr751,in,2012-03-08 00:00:02.0,ad,rpdt9h4kpooxiedeuuxvk6gi5 
9794511,3557769762,Samsung_C3050,du53k,dr71b,hr,2012-03-08 00:00:02.0,se, 

更新2

示例輸出。這是一個製表符分隔值格式。你可以複製並粘貼到Excel中以正確查看。這裏avg_spiky_ReAgCnIpCi是每秒重複的組合的平均計數爲reference,Agent,Country,IP,Client。我感興趣的是,然後我可以進行更改以獲得其他功能。

partnerid status avg_spiky_ReAgCnIpCi std_spiky_ReAgCnIpCi night_avg_spiky_ReAgCnIpCi night_std_spiky_ReAgCnIpCi morning_avg_spiky_ReAgCnIpCi morning_std_spiky_ReAgCnIpCi afternoon_avg_spiky_ReAgCnIpCi afternoon_std_spiky_ReAgCnIpCi evening_avg_spiky_ReAgCnIpCi evening_std_spiky_ReAgCnIpCi avg_spiky_ReAgCnIp std_spiky_ReAgCnIp avg_spiky_ReAgCn std_spiky_ReAgCn avg_spiky_iplong std_spiky_iplong avg_spiky_agent std_spiky_agent night_avg_spiky_agent night_std_spiky_agent morning_avg_spiky_agent morning_std_spiky_agent afternoon_avg_spiky_agent afternoon_std_spiky_agent evening_avg_spiky_agent evening_std_spiky_agent avg_spiky_cid std_spiky_cid avg_spiky_cntr std_spiky_cntr avg_spiky_referer std_spiky_referer night_avg_spiky_referer night_std_spiky_referer morning_avg_spiky_referer morning_std_spiky_referer afternoon_avg_spiky_referer afternoon_std_spiky_referer evening_avg_spiky_referer evening_std_spiky_referer category_es category_mc category_ad category_co category_se category_mg category_pp category_in category_gd category_ow total_clicks distinct_iplong distinct_agent distinct_cid distinct_cntr distinct_referer night_click_percent morning_click_percent afternoon_click_percent evening_click_percent night_referer_percent morning_referer_percent afternoon_referer_percent evening_referer_percent night_agent_percent morning_agent_percent afternoon_agent_percent evening_agent_percent avg_total_clicks std_total_clicks avg_distinct_iplong std_distinct_iplong avg_distinct_agent std_distinct_agent avg_distinct_cid std_distinct_cid avg_distinct_cntr std_distinct_cntr avg_distinct_referer std_distinct_referer avg_null_agent std_null_agent avg_null_referer std_null_referer night_avg_null_referer night_std_null_referer morning_avg_null_referer morning_std_null_referer afternoon_avg_null_referer afternoon_std_null_referer evening_avg_null_referer evening_std_null_referer first_15_minute_percent second_15_minute_percent third_15_minute_percent last_15_minute_percent brand_MAUI_percent brand_Nokia_percent brand_Generic_percent brand_Apple_percent brand_Blackberry_percent brand_Samsung_percent brand_SonyEricsson_percent brand_LG_percent brand_other_percent avg_per_hour_density std_per_hour_density cntr_az_percent cntr_id_percent cntr_in_percent cntr_us_percent cntr_ng_percent cntr_tr_percent cntr_ru_percent cntr_th_percent cntr_sg_percent cntr_uk_percent cntr_other_percent 
du3nk 0 1.23 8.47 0 0 0 0 0 0 1.23 8.47 1.24 8.48 1.27 8.61 4.14 11.73 8.73 16.06 0 0 0 0 0 0 8.73 16.06 38.18 240.99 60 248 1.8 10.35 0 0 0 0 0 0 1.8 10.35 0 1 0 0 0 0 0 0 0 0 3360 644 250 61 31 1696 0 0 0 1 0 0 0 1 0 0 0 1 3360 0 644 0 250 0 61 0 31 0 1696 0 0 0 598 0 0 0 0 0 0 0 598 0 0.16 0.17 0.33 0.35 0.01 0 0.05 0 0 0 0 0 0 2 0 0 0 0.13 0 0 0 0 0 0.01 0 0.04 
du3nq 1 8.38 5.83 0 0 0 0 0 0 8.38 5.83 25.13 9.27 25.13 9.27 188.5 49.5 188.5 49.5 0 0 0 0 0 0 188.5 49.5 53.86 39.03 188.5 49.5 25.13 9.27 0 0 0 0 0 0 25.13 9.27 1 0 0 0 0 0 0 0 0 0 377 1 1 5 1 8 0 0 0 1 0 0 0 1 0 0 0 1 377 0 1 0 1 0 5 0 1 0 8 0 0 0 0 0 0 0 0 0 0 0 0 0 0.09 0.14 0.33 0.44 0 0 0 1 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 1 
du3op 0 30.43 46.87 0 0 0 0 44.67 59.63 19.75 30.19 35.5 48.84 35.5 48.84 71 52.27 71 52.27 0 0 0 0 134 0 39.5 33.5 13.31 8.24 71 52.27 35.5 48.84 0 0 0 0 67 62 19.75 30.19 0 0 1 0 0 0 0 0 0 0 213 1 1 6 1 1 0 0 0.63 0.37 0 0 1 1 0 0 1 1 213 0 1 0 1 0 6 0 1 0 1 0 0 0 205 0 0 0 0 0 129 0 76 0 0 0.09 0.25 0.66 0 1 0 0 0 0 0 0 0 3 0 0 0 0 0 0 0 0 0 0 0 1 
du3or 0 1 0 0 0 0 0 1 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 1 0 1 0 0 1 0 0 0 0 0 0 0 0 2 2 1 1 1 1 0 0 0.5 0.5 0 0 1 1 0 0 1 1 2 0 2 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0.5 0 0 0.5 0 0 0 0 0 0 1 0 0 2 0 0 1 0 0 0 0 0 0 0 0 0 
du3ov 0 1.01 0.11 0 0 0 0 0 0 1.01 0.11 1.01 0.11 1.01 0.11 44 30 29.33 31.63 0 0 0 0 0 0 29.33 31.63 6.29 5.59 44 30 1.02 0.21 0 0 0 0 0 0 1.02 0.21 0 0 0 0 1 0 0 0 0 0 88 1 2 10 1 86 0 0 0 1 0 0 0 1 0 0 0 1 88 0 1 0 2 0 10 0 1 0 86 0 0 0 0 0 0 0 0 0 0 0 0 0 0.84 0 0 0.16 0 0.94 0 0.06 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 1 
du3ox 0 1 0 0 0 0 0 0 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 1 1 1 1 1 1 0 0 0 1 0 0 0 1 0 0 0 1 1 0 1 0 1 0 1 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 
du3oy 0 1.02 0.12 0 0 0 0 0 0 1.02 0.12 1.02 0.15 1.02 0.15 64.5 31.5 32.25 35.55 0 0 0 0 0 0 32.25 35.55 7.59 6.03 64.5 31.5 1.03 0.28 0 0 0 0 0 0 1.03 0.28 0 0 0 0 1 0 0 0 0 0 129 1 3 12 1 124 0 0 0 1 0 0 0 1 0 0 0 1 129 0 1 0 3 0 12 0 1 0 124 0 0 0 0 0 0 0 0 0 0 0 0 0 0.26 0.58 0.16 0 0 0.95 0 0.04 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 1 
du3oz 1 1 0 0 0 0 0 1 0 0 0 1 0 33 3.35 1.01 0.08 165 0 0 0 0 0 165 0 0 0 27.5 8.18 165 0 33 3.35 0 0 0 0 33 3.35 0 0 1 0 0 0 0 0 0 0 0 0 165 164 1 6 1 5 0 0 1 0 0 0 1 0 0 0 1 0 165 0 164 0 1 0 6 0 1 0 5 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 1 0 0 
du3p1 0 1 0 0 0 0 0 1 0 0 0 1 0 18.2 16.11 1.01 0.07 91 80 0 0 0 0 91 80 0 0 15.17 14.82 91 80 18.2 16.11 0 0 0 0 18.2 16.11 0 0 1 0 0 0 0 0 0 0 0 0 182 181 1 6 1 5 0 0 1 0 0 0 1 0 0 0 1 0 182 0 181 0 1 0 6 0 1 0 5 0 0 0 0 0 0 0 0 0 0 0 0 0 0.06 0 0 0.94 0 1 0 0 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 1 0 0 
du3r7 0 3.63 1.32 0 0 0 0 0 0 3.63 1.32 29 0 29 0 29 0 29 0 0 0 0 0 0 0 29 0 3.63 1.32 29 0 29 0 0 0 0 0 0 0 29 0 0 0 0 0 1 0 0 0 0 0 29 1 1 8 1 1 0 0 0 1 0 0 0 1 0 0 0 1 29 0 1 0 1 0 8 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 
+0

可以請您以可重複的格式分享您的數據嗎?你有什麼嘗試? – mtoto

+0

更新我試過的問題和原始數據集的前25條記錄。 –

+0

我們需要更多的信息和更清晰的信息。當你說「在一分鐘的時間間隔內」時,你是指每隔一分鐘的時間間隔還是你的意思是在每個記錄的一分鐘內。 (即:在0min-1min內,1min-2min內等所有重複內容.vs ..在59s處記錄0,在1mins處記錄1在彼此的一分鐘內)。我可以建議提供一個你期望的輸入和輸出的簡單例子。 – gnicholas

回答

1

初始化:

from pyspark import * 
from pyspark.sql import * 
from pyspark.sql.types import * 
from pyspark.sql import functions as f 

這等於你的第一個25條記錄我的原始數據集文件「。

df = spark.read.load(path="file:///home/zht/PycharmProjects/test/disk_file", format='csv', sep=',', header=True) 

只是爲了得到一個顯着結果,這一步可以忽略

df = df.withColumn('iplong', f.substring('iplong', pos=0, len=1)) \ 
    .withColumn('agent', f.substring('agent', pos=0, len=1)) \ 
    .withColumn('client', f.substring('client', pos=0, len=2)) \ 
    .withColumn('partner', f.substring('partner', pos=0, len=2)) \ 
    .withColumn('timestamp',f.when(f.substring('id', pos=6, len=1) % 2 == 1, '2012-03-08 00:01:00.0').otherwise(df['timestamp'])) 
df.show() 

+-------+------+-----+-------+------+-------+--------------------+--------+--------------------+ 
|  id|iplong|agent|partner|client|country|   timestamp|category|   reference| 
+-------+------+-----+-------+------+-------+--------------------+--------+--------------------+ 
|9794476|  1| S|  dv| ds|  us|2012-03-08 00:01:...|  ad|    null| 
|9794474|  1| S|  dv| ds|  in|2012-03-08 00:01:...|  mg|riflql2a0yv8xoa9s...| 
|9794471|  3| N|  du| dr|  py|2012-03-08 00:01:...|  co|    null| 
|9794468|  1| N|  dv| ds|  vn|2012-03-08 00:00:...|  es|gp53lqr9njqd6z2ap...| 
|9794467|  1| M|  du| dv|  in|2012-03-08 00:00:...|  ad|    null| 
|9794466|  1| N|  du| dv|  in|2012-03-08 00:00:...|  ad|    null| 
|9794477|  1| B|  du| dt|  es|2012-03-08 00:01:...|  es|h5njsswvxorsau9u8...| 
|9794478|  1| N|  du| ds|  ru|2012-03-08 00:01:...|  mc|    null| 
|9794481|  1| N|  dv| dr|  th|2012-03-08 00:00:...|  mc|oj0rekb51pvirnjuq...| 
|9794482|  1| N|  du| dr|  id|2012-03-08 00:00:...|  co|r63f8uhijvr2irvka...| 
|9794483|  1| M|  dv| dv|  id|2012-03-08 00:00:...|  ad|    null| 
|9794485|  2| G|  dv| dr|  th|2012-03-08 00:00:...|  co|    null| 
|9794486|  3| M|  dv| ds|  za|2012-03-08 00:00:...|  es|    null| 
|9794492|  7| N|  dv| dt|  ng|2012-03-08 00:01:...|  es|onbw7na2mi8a62g4p...| 
|9794493|  6| N|  du| dv|  sd|2012-03-08 00:01:...|  ad|hoq05psulkszxm4iz...| 
|9794495|  1| S|  dv| dv|  in|2012-03-08 00:01:...|  co|im387req0zp1ucyga...| 
|9794496|  1| G|  du| ds|  in|2012-03-08 00:01:...|  mc|immfap8948rebeym8...| 
|9794498|  1| S|  du| ds|  in|2012-03-08 00:01:...|  mc|r81nrzjemr5jrfvjj...| 
|9794499|  1| N|  dv| dr|  au|2012-03-08 00:01:...|  ad|    null| 
|9794500|  1| N|  dv| dr|  id|2012-03-08 00:00:...|  co|tq09jycwii12iul7h...| 
+-------+------+-----+-------+------+-------+--------------------+--------+--------------------+ 

和鍵操作:

res = df.groupBy([f.window('timestamp', windowDuration='1 minutes'),'partner', 'iplong', 'agent']).count() 
res = res.withColumn('total',f.sum('count').over(Window.partitionBy(["window", "partner"]))) 
res.show(n=30, truncate=False) 

+---------------------------------------------+-------+------+-----+-----+-----+ 
|window          |partner|iplong|agent|count|total| 
+---------------------------------------------+-------+------+-----+-----+-----+ 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |N |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |3  |N |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |3  |S |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |6  |N |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |B |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |G |1 |7 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|du  |1  |S |1 |7 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |3  |M |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |N |3 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |2  |G |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |G |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |M |1 |8 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|dv  |1  |S |1 |8 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |3  |S |1 |6 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |7  |N |1 |6 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |1  |S |3 |6 | 
|[2012-03-08 00:01:00.0,2012-03-08 00:02:00.0]|dv  |1  |N |1 |6 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|du  |2  |S |1 |4 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|du  |1  |M |1 |4 | 
|[2012-03-08 00:00:00.0,2012-03-08 00:01:00.0]|du  |1  |N |2 |4 | 
+---------------------------------------------+-------+------+-----+-----+-----+ 

計數意味着每個記錄量每1分鐘&合作伙伴& iplong &代理

意味着記錄每1分鐘&夥伴

你的意思是這個量?

+0

鍵操作輸出(最後一張打印的表格)是對應於partnerid?如果是這樣,那麼這真的是一個很大的幫助,這是我需要朝着正確的方向前進的。這不是我需要的確切輸出。但它的目的是爲了指向正確的方向。 –

+0

@KeyurGolani我更新了代碼並添加了一些解釋 –

+0

是的。我想要的各種不同的列組合比「每1分鐘&合作伙伴&iplong&代理」,但這是比我所期望的更多。這將有助於我獲得理想的結果。非常感謝!我非常欣賞這一努力。 –