2017-02-24 73 views
0

我是Python新手,試圖從hadoop流中讀取數據。hadoop streaming中的pd.read_csv問題

這是我的python代碼,var_list = get_config()工作正常。

if __name__ == "__main__": 
     var_list = get_config() 
     dat = pd.read_table(lines,delimiter=',',header=0) 
     #print (dat) 
     print (dat.dtypes) 
     #print (dat['var8']) 

這是我傳遞的文件,第一行作爲標題。

$ cat data 
client_id,var1,var2,var3,var4,var5,var6,var7,var8 
121,1,2,3,4,5,6,7,8 
112,1,2,3,4,5,6,7,8 
102,1,2,3,4,5,6,7,8 
121,1,2,3,4,5,6,7,8 
125,8,7,6,5,4,3,2,1 

當我嘗試打印dat.dtypes,這裏是輸出。

$ cat part-00000 
1 int64 
102  int64 
2 int64 
3 int64 
4 int64 
5 int64 
6 int64 
7 int64 
8 int64 
client_id int64 
dtype: object 
dtype: object 
var1 int64 
var2 int64 
var3 int64 
var4 int64 
var5 int64 
var6 int64 
var7 int64 
var8 int64 

我的問題是它沒有正確讀取數據,可能是什麼問題?

我也試過pd.read_csv,當我用pd.read_table讀取數據時,它給了我同樣的問題。

我正在執行像這樣的hadoop流。

hadoop jar /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0-mapr-1602.jar -Dmapreduce.job.queuename=opsistg_q1 -Dmapreduce.map.java.opts=-Xmx40960m -Dmapreduce.map.memory.mb=25000 -Dyarn.app.mapreduce.am.resource.mb=25000 -Dmapreduce.task.timeout=180000000 -mapper "<local path>/config_mapper.py" -input "<hadoop location>/data" 
+0

明白了這個問題,這裏sys.stdin逐行讀取數據,這應該是問題所在。所以,現在有什麼想法,誰讀取sys.stdin並逐行放置到某個緩衝區或文件? – subro

+0

請將您的代碼,數據和輸出粘貼爲文本而不是圖片。 – Khris

+0

@Kris請現在檢查 – subro

回答

1

這是一個簡單的代碼。

映射器:

#!/usr/bin/env python 
import sys 

for line in sys.stdin: 
    line = line.strip() 
    toks = line.split('\001' ,2)  
    ck=toks[0]+toks[1]    
    others=toks[2]   
    print '%s\t%s'%(ck, others) 

從stdin注意到數據,將其分解並放出它作爲密鑰(CK) - 值(其他)對。

減速機:

#!/usr/bin/env python 
import sys 
for line in sys.stdin: 
    line = line.strip() 
    ck_others = line.split('\t')   # parsing mapper o/p 
    ck = ck_others[0]      
    others = ck_others[1]     

    other_parsed = others.split('\001') 

此解析映射器輸出以及分割值(others)爲好。