我已經在Python中編寫了一個簡單的MapReduce示例。如果輸入是一個文件,例如text
文件,爲了運行代碼,我們只需使用以下模式:cat <data> | map | sort | reduce
,例如在我的情況下,它是:cat data | ./mapper.py | sort | ./reducer.py
並且所有的東西都是正確的。Python - 如何將目錄傳遞爲MapReduce輸入
但我更改了我的映射器和縮減器以讀取directory
中包含.gz
文件的數據。所以我應該通過path of the directory
作爲輸入。我測試以下終端命令cat dat/ | ./mapper.py | sort | ./reducer.py
而含有數據的目錄是dat/
,但我面對錯誤:
cat: dat/: Is a directory
Traceback (most recent call last):
File "./mapper.py", line 9, in <module>
for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'
如何可以通過一個目錄作爲輸入在Python MapReduce的?
以下是我的代碼:
mapper.py
#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip
QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
f = gzip.open(filename, 'r')
for line in f:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if temp != MISSING and q in QUALITY:
print " %s\t%s" % (year, temp)
reducer.py
#!/usr/bin/env python
import sys
max_val = -sys.maxint
key = ''
for line in sys.stdin:
(key, val) = line.strip().split('\t')
max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
'zcat data/*。gz | ./mapper.py |排序| ./reducer.py' – philantrovert
@philantrovert謝謝,注意我的映射器,我想輸入是包含'.gz'文件的目錄地址,我使用'for loop'來讀取它們,正如我以前所做的那樣,但不是在MapReduce模型中。但我認爲你的建議傳遞了目錄中所有'.gz'文件的確切地址。我對嗎? – soheil
'zcat'(gzip + cat)會提取.gz文件並將其內容傳遞給您的映射器。也許,這將適用於.gz文件而無需更改您的映射器。 – Chickenmarkus