2013-09-24 18 views
0

我正在嘗試在亞馬遜的EMR上運行mrjob。我使用內聯運行器在本地測試了該作業,但在亞馬遜上運行時失敗。我將故障範圍縮小到了我對外部數據文件zip_codes.txt的依賴。如果我使用硬編碼的郵政編碼數據運行沒有依賴關係,它工作得很好。數據文件應如何包含在EMR上的mrjob中?

我試圖使用上傳文件參數包含必要的數據文件。當我看到S3時,該文件確實在那裏,但顯然有些事情出錯了,所以我無法在本地訪問它。

enter image description here

這裏是我的mrjob.conf文件:

runners: 
    emr: 
    aws_access_key_id: FOOBARBAZQUX 
    aws_secret_access_key: IAMASECRETKEY 
    aws_region: us-east-1 
    ec2_key_pair: mapreduce 
    ec2_key_pair_file: $ENV/keys/mapreduce.pem 
    ssh_tunnel_to_job_tracker: true 
    ssh_tunnel_is_open: true 
    cleanup_on_failure: ALL 
    cmdenv: 
     TZ: America/Los_Angeles 

這是我MR_zip.py文件。

from mrjob.job import MRJob 
import mrjob 
import csv 

def distance(p1, p2): 
    # d = ...  
    return d 

class MR_zip(MRJob): 
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol 
    zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))} 

    def mapper(self, _, line): 
     zip_code_1, poi = line.split(",") 
     zip_code_1 = int(zip_code_1) 
     lat1, lon1 = self.zip_codes[zip_code_1] 
     for zip_code_2, (lat2, lon2) in self.zip_codes.items(): 
      d = distance((lat1, lon1), (lat2, lon2)) 
      yield zip_code_2, (zip_code_1, poi, d) 

    def reducer(self, zip_code_1, ds): 
     result = {} 
     for zip_code_2, poi, d in ds: 
      if poi not in result: 
       result[poi] = (zip_code_2, d) 
      elif result[poi][1] > d: 
       result[poi] = (zip_code_2, d) 
     yield zip_code_1, result 

if __name__ == '__main__': 
    MR_zip.run() 

最後,我用下面的命令來運行它:

python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt 

凡zip_codes.txt樣子:

... 
62323,39.817702,-90.66923 
62324,39.988988,-90.94976 
62325,40.034398,-91.16278 
62326,40.421857,-90.80333 
... 

而且poi.txt樣子:

... 
210,skate park 
501,theatre 
29001,theatre 
8001,knitting club 
20101,food bank 
... 

回答

1

Overv IEW

有兩個錯誤在我的代碼:

  1. 的步驟初始化代碼應該是在步的初始化
  2. 默認情況下EMR使用Python 2.6這就排除了除其他事項外
  3. 字典解析

步驟初始化

永遠y步有一個相應的初始化方法。例如,mapper具有mapper_init,可用於初始化映射器中使用的數據。函數reducercombiner具有相似的初始化方法。如果使用steps函數來定義自己的步驟,那麼您還可以定義使用哪個初始化函數。詳細瞭解初始化程序here

謹防Python版本

截至今天,EMR默認使用Python版本2.6.6。因此,對更高版本的任何依賴關係都可能在本地運行,但在EMR上存在問題。

的修復

要修復上面的代碼中,需要除去限定zip_codes線在MR_zip.py

zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))} 

,而是定義它的內部mapper_init不使用字典推導。

def mapper_init(self): 
    self.zip_codes = {} 
    for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")): 
     self.zip_codes[int(zip_code)] = (float(latitude), float(longitude)) 

其他文件和命令行保持不變。

3

此外,您可能會發現有用的MRJob.add_file_option例程。例如,指定

self.add_file_option('--config-file', dest='config_file', 
    default=None, help='file with labels', action="append") 

您可以通過self.options.config_file路徑列表參考上傳的文件。

相關問題