2014-04-24 31 views
0

我有一個status_changes的日誌文件,每個文件都有一個driver_id,時間戳和持續時間。使用driver_id和timestamp,我想從S3中獲取適當的GPS日誌。這些GPS日誌以bucket_name/yyyy/mm/dd/driver_id.log形式存儲在S3存儲桶中。如何獲取並處理每個迭代的mrjob映射器的新S3文件?

from mrjob.job import MRJob 
class Mileage(MRJob): 

    def get_s3_gpslog_path(self, driver_id, occurred_at, status): 
     s3_path = "s3://gps_logs/{yyyy}/{mm}/{dd}/{driver_id}.log" 
     s3_path = s3_path.format(yyyy=occurred_at.year, 
           mm=occurred_at.month, 
           dd=occurred_at.day, 
           driver_id=driver_id) 
     return s3_path 

    def mapper(self, _, line): 
     line = ast.literal_eval(line) 
     driver_id = line['driverId'] 
     occurred_at = line['timestamp'] 
     status = line['status'] 
     s3_path = self.get_s3_gpslog_path(driver_id, occurred_at, status) 
     # ^^ How do I fetch this file and read it? 
     distance = calculate_distance_from_gps_log(s3_path, occurred_at, status) 

     yield status, distance 


if __name__ == '__main__': 
    Mileage.run() 

和命令行我與status_change日誌文件作爲輸入運行它: $蟒蛇mileage.py status_changes.log

我的問題是:如何真正獲取該GPS日誌,給予我構建的S3 URI字符串?

回答

0

您可以使用作爲mrjob一部分的S3Filesystem。您也可以在腳本中使用boto的S3實用程序。您可能需要硬編碼(或從每個節點中的Hadoop配置解析)(祕密)訪問密鑰。但是,映射程序正在做的事情可能有點太多,可能會過度請求獲取S3資源。您可能可以重寫MapReduce算法,以便通過將GPS日誌與其他日誌一起傳輸,以較少資源密集型的方式執行加入。

相關問題