2013-05-16 45 views
4

我錯過了Yelp的mrjob job庫的一些明顯的東西。設置MRJob類幾乎非常簡單。通過file或stdin運行也是如此。但是,我怎樣才能將本地或s3文件中的輸入從文件中更改爲s3中的密鑰?如何將s3對象名稱用作MRJob映射器的輸入,而不是s3對象本身?

就是這樣。假設我想指望在我的S3存儲桶以字符串「富」開頭的所有對象:

import re 

class MRCountS3Objects(MRJob): 

    define mapper(self, _, botoS3Key): 
     if re.match('^foo', botoS3Key.name): 
      yield 'foo', 1 

    define reduce(self, name, occurrences): 
     yield name, sum(occurrences) 

這是一個高度人爲的例子,但你可能明白我的意思。我如何告訴MRJob在s3對象流上操作,忽略對象的內容?我看到S3Filesystem.get_s3_keys()method,它使我完全得到我需要的流,但我不確定該從哪裏去。

回答

4

想出至少一種方法來實現這一點。您的MRJob具有stdin屬性,可以將其分配給任何迭代器,然後可以通過編程方式運行該作業。例如,該代碼應該在my-bucket的密鑰名稱上工作:

from mrjob.job import MRJob 
from mrjob.emr import EMRJobRunner 

class MRS3KeyProcessor(MRJob): 
    # Do some MRJob stuff. 
    ... 

def s3_name_generator(bucket): 
    """Generator that returns boto.s3.Key names. 
    """ 
    # Could also use raw boto here. 
    emr = EMRJobRunner() 
    key_stream = emr.fs.get_s3_keys(bucket) 
    for key in key_stream: 
     yield key.name 

def main(): 
    # The '-' argument signifies that we use stdin. 
    mr_job = MRCountS3Objects(['--runner', 'inline', '-']) 
    stdin = s3_name_generator('my-bucket') 
    mr_job.stdin = stdin 
    results = [] 
    with mr_job.make_runner() as runner: 
     runner.run() 
     for line in runner.stream_output(): 
      key, value = mr_job.parse_output_line(line) 
      results.append((key, value)) 
    print(results) 

if __name__ == '__main__': 
    main()