2014-10-19 110 views
0

我試圖以編程方式運行簡單的wordcount示例,但我無法讓代碼在hadoop集羣上工作。mrjob壞 - 在Hadoop集羣上使用make_runner時出現錯誤

作業test_job.py:在mr_job_test.py

from mrjob.job import MRJob 
import re 


WORD_RE = re.compile(r"[\w']+") 

class MRWordFreqCount(MRJob): 

    def mapper(self, _, line): 
     for word in WORD_RE.findall(line): 
      yield word.lower(), 1 

    def combiner(self, word, counts): 
     yield word, sum(counts) 

    def reducer(self, word, counts): 
     yield word, sum(counts) 

亞軍:

from test_jobs import MRWordFreqCount 

def test_runner(in_args, input_dir): 
    tmp_output = [] 
    args = in_args + input_dir 
    mr_job = MRWordFreqCount(args.split()) 
    with mr_job.make_runner() as runner: 
     runner.run() 
     for line in runner.stream_output(): 
      tmp_output = tmp_output + [line] 
    return tmp_output 

if __name__ == '__main__': 
    input_dir = 'hdfs:///test_input/' 
    args = '-r hadoop ' 
    print test_runner(args, input_dir) 

我可以(與inline選項)本地運行這段代碼,但在Hadoop我:

> Traceback (most recent call last): File "mr_job_tester.py", line 17, 
> in <module> 
>  print test_runner(args, input_dir) File "mr_job_tester.py", line 8, in test_runner 
>  runner.run() File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 458, in 
> run 
>  self._run() File "/usr/local/lib/python2.7/dist-packages/mrjob/hadoop.py", line 239, in 
> _run 
>  self._run_job_in_hadoop() File "/usr/local/lib/python2.7/dist-packages/mrjob/hadoop.py", line 295, in 
> _run_job_in_hadoop 
>  for step_num in xrange(self._num_steps()): File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 742, in 
> _num_steps 
>  return len(self._get_steps()) File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 721, in 
> _get_steps 
>  raise ValueError("Bad --steps response: \n%s" % stdout) ValueError: Bad --steps response: 

回答

0

According to this)mrjob提交作業文件並在映射器內部遠程執行的方式和減速器,使得有必要在工作申報文件中有以下幾行:

if __name__ == "__main__": 
    MRWordFreqCount.run() 
相關問題