2017-07-04 39 views
2

我已經在Python中編寫了一個簡單的MapReduce示例。如果輸入是一個文件,例如text文件,爲了運行代碼,我們只需使用以下模式:cat <data> | map | sort | reduce,例如在我的情況下,它是:cat data | ./mapper.py | sort | ./reducer.py並且所有的東西都是正確的。Python - 如何將目錄傳遞爲MapReduce輸入

但我更改了我的映射器和縮減器以讀取directory中包含.gz文件的數據。所以我應該通過path of the directory作爲輸入。我測試以下終端命令cat dat/ | ./mapper.py | sort | ./reducer.py而含有數據的目錄是dat/,但我面對錯誤:

cat: dat/: Is a directory 
Traceback (most recent call last): 
    File "./mapper.py", line 9, in <module> 
    for filename in glob.glob(sys.stdin + '*.gz'): 
TypeError: unsupported operand type(s) for +: 'file' and 'str' 

如何可以通過一個目錄作爲輸入在Python MapReduce的?

以下是我的代碼:

mapper.py

#!/usr/bin/env python 
import sys 
#import timeit 
import glob 
import gzip 

QUALITY = '01459' 
MISSING = '+9999' 
for filename in glob.glob(sys.stdin + '*.gz'): 
    f = gzip.open(filename, 'r') 
    for line in f: 
     val = line.strip() 
     (year, temp, q) = (val[15:19], val[87:92], val[92:93]) 
     if temp != MISSING and q in QUALITY: 
      print " %s\t%s" % (year, temp) 

reducer.py

#!/usr/bin/env python 
import sys 

max_val = -sys.maxint 
key = '' 
for line in sys.stdin: 
    (key, val) = line.strip().split('\t') 
    max_val = max(max_val, int(val)) 
print "The last IF %s\t%s" % (key, max_val) 
+1

'zcat data/*。gz | ./mapper.py |排序| ./reducer.py' – philantrovert

+0

@philantrovert謝謝,注意我的映射器,我想輸入是包含'.gz'文件的目錄地址,我使用'for loop'來讀取它們,正如我以前所做的那樣,但不是在MapReduce模型中。但我認爲你的建議傳遞了目錄中所有'.gz'文件的確切地址。我對嗎? – soheil

+1

'zcat'(gzip + cat)會提取.gz文件並將其內容傳遞給您的映射器。也許,這將適用於.gz文件而無需更改您的映射器。 – Chickenmarkus

回答

1

for filename in glob.glob(sys.stdin + '*.gz'):預計從stdin的字符串。因此,簡單地傳遞,而不是文件內容的字符串(echo)(cat):

$ echo dat/ | ./mapper.py | sort | ./reducer.py 

但是,爲什麼你通過管道傳遞參數?通常參數直接傳遞並通過python通過sys.argv(或通過解釋器,如「argparse」)讀取。

0

爲了得到當前工作目錄使用的路徑:

import os 
path = os.getcwd() 

你可以得到所有的t他從這個文件中的文件:

filenames = os.listdir(path) 
# filter files that doesn't have .gz filetype 
filenames = [file_name for file_name in filenames if file_name.endswith('.gz')] 

您可以將文件簡單地遍歷有:

for filename in filenames: 
    f = gzip.open(path+filename, 'r') 
相關問題