2016-05-17 156 views
0

以下代碼錯誤偶爾出現一次。如果我只啓動一個進程,它工作正常。但我不斷增加進程的數量,可能是11,並開始拋出一個錯誤。python multiprocessing TypeError:'int'對象不可迭代

try: 
    num_workers = int(sys.argv[1]) 
except: 
    num_workers = 1 

someval = 10 
def do_work(in_queue,x): 
    i = 0 
    while True: 
     item = in_queue.get() 
     line_no, line = item 

     # exit signal 
     if line == None: 
      if i > 0 : 
       work.put(i,) 
      # work.put(i) 
      return 
     else: 
      print "value from work " + line.rstrip('\n') 
      i = i + 1 

if __name__ == "__main__": 

    manager = Manager() 
    work = manager.Queue(num_workers) 
    someval = 20 
    print " Number of workers is " + str(num_workers) 
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work,someval)) 
     p.start() 
     pool.append(p) 
    with open("/home/jay/scripts/a.txt") as f: 
     iters = itertools.chain(f, (None,)*num_workers) 
     for num_and_line in enumerate(iters): 
       work.put(num_and_line) 

    x = 0 
    for p in pool: 
     p.join() 

文件/home/jay/scripts/a.txt有10行。

如果我做

./x.py 7 
    Number of workers is 7 
    value from work 1 
    value from work 2 
    value from work 3 
    value from work 4 
    value from work 5 
    value from work 6 
    value from work 7 
    value from work 8 
    value from work 9 
    value from work 10 
    x is 0 
    all done 

./x.py 11 
Number of workers is 11 
value from work 1 
value from work 2 
value from work 3 
value from work 4 
value from work 5 
value from work 6 
value from work 7 
value from work 8 
value from work 9 
value from work 10 
Process Process-11: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run 
    self._target(*self._args, **self._kwargs) 
    File "./x.py", line 18, in do_work 
    line_no, line = item 
TypeError: 'int' object is not iterable 
x is 0 
all done 
+0

在第18行之前加上'print(repr(item))'這樣你就可以知道這個值是什麼 –

回答

2

的違規行爲do_work work.put(i,)你把int到隊列和int被讀取,而另一個工人解開。

另外我同意dano使用multiprocessing.Pool更容易和更短。

if __name__ == "__main__": 
    pool = multiprocessing.Pool(num_workers) 
    with open("/home/jay/scripts/a.txt") as f: 
     mapped = pool.map(do_work, f) 

如果您需要i從工人只返回它,它會被存儲在mapped

+0

我的真實文件很大 - 大於100 GB。根據另一個線程,「地圖將在開始工作之前一次性使用您的文件。」因此,我決定採取這種方法。 http://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes-in-python – Jayadevan

2

的問題是,work.put(1,)沒有做什麼,你認爲它。您打算將1元組(1,)放入隊列中,但實際上只是將1放入隊列中。如果您將該行更改爲work.put((1,)),您會看到您期望的行爲。

有與num_workers較大值,讓你的子過程的在主處理循環完成裝載Queue了與(None,)定點值前添加1到隊列中的一個競爭條件。對於較小的值num_workers,您可以在任何工作進程將1添加到隊列之前通過for循環。

此外,您是否考慮過使用multiprocessing.Pool,而不是使用ProcessQueue手動創建Pool?它會相當簡化你的代碼。

相關問題