我正在嘗試在亞馬遜的EMR上運行mrjob。我使用內聯運行器在本地測試了該作業,但在亞馬遜上運行時失敗。我將故障範圍縮小到了我對外部數據文件zip_codes.txt
的依賴。如果我使用硬編碼的郵政編碼數據運行沒有依賴關係,它工作得很好。數據文件應如何包含在EMR上的mrjob中?
我試圖使用上傳文件參數包含必要的數據文件。當我看到S3時,該文件確實在那裏,但顯然有些事情出錯了,所以我無法在本地訪問它。
這裏是我的mrjob.conf
文件:
runners:
emr:
aws_access_key_id: FOOBARBAZQUX
aws_secret_access_key: IAMASECRETKEY
aws_region: us-east-1
ec2_key_pair: mapreduce
ec2_key_pair_file: $ENV/keys/mapreduce.pem
ssh_tunnel_to_job_tracker: true
ssh_tunnel_is_open: true
cleanup_on_failure: ALL
cmdenv:
TZ: America/Los_Angeles
這是我MR_zip.py
文件。
from mrjob.job import MRJob
import mrjob
import csv
def distance(p1, p2):
# d = ...
return d
class MR_zip(MRJob):
OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol
zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}
def mapper(self, _, line):
zip_code_1, poi = line.split(",")
zip_code_1 = int(zip_code_1)
lat1, lon1 = self.zip_codes[zip_code_1]
for zip_code_2, (lat2, lon2) in self.zip_codes.items():
d = distance((lat1, lon1), (lat2, lon2))
yield zip_code_2, (zip_code_1, poi, d)
def reducer(self, zip_code_1, ds):
result = {}
for zip_code_2, poi, d in ds:
if poi not in result:
result[poi] = (zip_code_2, d)
elif result[poi][1] > d:
result[poi] = (zip_code_2, d)
yield zip_code_1, result
if __name__ == '__main__':
MR_zip.run()
最後,我用下面的命令來運行它:
python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt
凡zip_codes.txt樣子:
...
62323,39.817702,-90.66923
62324,39.988988,-90.94976
62325,40.034398,-91.16278
62326,40.421857,-90.80333
...
而且poi.txt樣子:
...
210,skate park
501,theatre
29001,theatre
8001,knitting club
20101,food bank
...