2016-01-13 59 views
0

我想一些星火過濾和轉化的,我有樣本數據,星火行動地圖,flatmap,過濾器,ReduceByKey,使用python

657483, 888888, 9 
657483, 888889, 10 
657484, 888888, 20 
657484, 888889, 30 

對於每個x[0]我要檢查的條件相匹配x[1] == '888888' && x[1] == '888889'和得到相對x [2]所以輸出看起來像,

657483,9,10 
657484,20,30 

我想用火花貼圖,過濾器轉換做到這一點。所以我嘗試

result = (file1 
    .map(lambda x: (x.split(",")[0],x)) 
    .groupByKey() 
    .map(lambda x: (x[0], list(x[1]))) 
    .sortByKey('true') 
    .coalesce(1).map(lambda line: (line[0], if(line[1] == "888888"), and (line[1] == "888889"))).saveAsTextFile('hdfs://localhost:9000/filter')) 

它給我的結果一樣,

657483,false,false 

657484,false,false 

我怎樣才能提取x[0]x[2]包含x[1] == 888888 && x[1] == '888889'。如果條件過濾結果,我們如何申請。

+3

因此,閱讀一些如果遇到問題,請嘗試嘗試並提出問題。你不能簡單地來這裏要求有人爲你做,甚至沒有顯示任何嘗試的證據。 –

回答

3
def filterfunct(x): 
    if (len(x[1]) != 2): 
     return false 
    else: 
     if (x[1][0][0] == 888888 and x[1][1][0] == 888889) or (x[1][1][0] == 888888 and x[1][0][0] == 888889) : 
      return true 
     else: 
      return false 
def mapfunct(x): 
    if (x[1][0][0] == 888888): 
     return (x[0],x[1][0][1],x[1][1][1]) 
    else: 
     return (x[0],x[1][1][1],x[1][0][1]) 





result = (file1 
.map(lambda x: (x.split(",")[0],(int(x.split(",")[1]),int(x.split(",")[2])))) 
.groupByKey() 
.map(lambda x: (x[0], filter(lambda y: y[0]==888888 or y[0]==888889, list(x[1])))) 
.filter(filterfunct) 
.map(mapfunct) 
.sortByKey('true') 
.saveAsTextFile('hdfs://localhost:9000/filter')) 

groupByKey()會給這樣{(657483,[(888888, 9),(888889, 10)]},其中(x,y)是一個元組和[x,y]是一個列表的結果。但是,您不知道列表的構建順序(大部分時間遵循它們的讀取順序,但是如果兩個連續的行結束於不同的分區中,您可能會使其反轉)

+0

這些值現在按鍵分組,並且我應用如果條件在映射函數中,但它給我657483,假,這樣的假。如果條件在Python中的映射函數是什麼是完美的方式 –

+0

如果它適合你,請參閱我的修改答案 –

+0

感謝Radu lonescu,代碼已執行,但輸出文件將空白.. –