0
我有一個status_changes的日誌文件,每個文件都有一個driver_id,時間戳和持續時間。使用driver_id和timestamp,我想從S3中獲取適當的GPS日誌。這些GPS日誌以bucket_name/yyyy/mm/dd/driver_id.log形式存儲在S3存儲桶中。如何獲取並處理每個迭代的mrjob映射器的新S3文件?
from mrjob.job import MRJob
class Mileage(MRJob):
def get_s3_gpslog_path(self, driver_id, occurred_at, status):
s3_path = "s3://gps_logs/{yyyy}/{mm}/{dd}/{driver_id}.log"
s3_path = s3_path.format(yyyy=occurred_at.year,
mm=occurred_at.month,
dd=occurred_at.day,
driver_id=driver_id)
return s3_path
def mapper(self, _, line):
line = ast.literal_eval(line)
driver_id = line['driverId']
occurred_at = line['timestamp']
status = line['status']
s3_path = self.get_s3_gpslog_path(driver_id, occurred_at, status)
# ^^ How do I fetch this file and read it?
distance = calculate_distance_from_gps_log(s3_path, occurred_at, status)
yield status, distance
if __name__ == '__main__':
Mileage.run()
和命令行我與status_change日誌文件作爲輸入運行它: $蟒蛇mileage.py status_changes.log
我的問題是:如何真正獲取該GPS日誌,給予我構建的S3 URI字符串?