2016-07-07 116 views
1

基本上這個問題如下:我有一堆工作人員,每個工作人員都有一個函數(函數是worker(alist)),並且試圖同時處理35名工人。每個工作人員從文件(模數部分)中讀取他們的行,並使用「工作人員」功能處理該行。我進行了筆測,發現無用指數的原始操作和刪除按預期工作100%。池多重處理Python

「pool.apply_async」函數的args部分未將「raw」列表傳入並啓動該進程。 Raw是完全正確的,並且正常工作,worker自身正常工作,pool.apply_async函數是唯一出現問題的地方,我不知道如何解決它。請幫忙嗎?

相關的代碼是在這裏:

NUM_WORKERS=35 
f=open("test.csv") 
pool=multiprocessing.Pool() 
open("final.csv",'w') 
for workernumber in range(1, NUM_WORKERS): 
    for i,line in enumerate(f): 
     if i==0: 
      print "Skipping first line" #dont do anything 
     elif i%workernumber==0: 
      raw = line.split(',')[0][1:-1].split() 
      uselessindices=[-2,-3,-4,-5,-6] 
      counter=0 
      for ui in uselessindices: 
       del raw[ui+counter] 
       counter+=1 
      print raw 
      pool.apply_async(worker, args=(raw,)) 
pool.close() 
pool.join() 
+0

爲什麼你循環使用'workernumber'呢?你真的希望第一個工作人員處理文件中的每一行,第二個工作人員處理每一行?我沒有看到任何理由期望處理一條線不止一次會有任何好處,但也許我錯過了一些東西。或者你真的希望每一行都被處理一次?如果是這樣,那麼外層循環完全沒有意義(就像'elif'上的條件一樣)。 – Blckknght

+0

35是很多處理器。 – 101

+0

@ 101我爲num_workers使用了一個任意數字。有16個處理器,但操作不是CPU密集型的,只需要由大量工人完成並編譯。我是一個多處理初學者,並且試圖通過我的文檔。有沒有辦法做得更好? – furby559

回答

0
import multiprocessing 

def worker(arg): 
    print 'doing work "%s"' % arg 
    return 

NUM_WORKERS=35 
with open('test.csv', 'w') as test: 
    for i in xrange(100): 
     if i % 10 == 0: 
      test.write('\n') 
     test.write('"%s 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23",' % i) 

f=open("test.csv") 
pool=multiprocessing.Pool(processes=NUM_WORKERS) 
open("final.csv",'w') 

for i, line in enumerate(f): 
    if i == 0: 
     continue 
    raw = line.split(',')[0][1:-1].split() 
    uselessindices=[-2,-3,-4,-5,-6] 
    counter=0 
    for ui in uselessindices: 
     del raw[ui+counter] 
     counter+=1 
    pool.apply_async(worker, args=(raw,)) 
pool.close() 
pool.join() 
print 'last raw len: %s' % len(raw) 
print 'last raw value: %s' % raw 

輸出:

doing work "['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['10', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['20', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['30', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['40', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['50', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['60', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['70', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['80', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
doing work "['90', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23']" 
last raw len: 19 
last raw value: ['90', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '23'] 
+0

由於行raw = lines [i] .split(',')[0],列表可能沒有正確傳入, [1:-1] .split() 如果您可以提供樣本行,也許我可以爲您檢查。 – Bamcclur

+0

我檢查了原始數據和原始數據,並且正確傳入。我試圖運行你的代碼無濟於事;該工人仍然沒有運行。工作人員開始時的簡單打印沒有被執行,但腳本運行時沒有出現錯誤,這也是之前發生的情況。 – furby559

+0

我添加了一些測試數據,以演示您的代碼將如何處理每行10個值的csv,每個值包含24個獨立部分的字符串,因此在刪除無用部分後,原始長度爲19。 – Bamcclur

0

我建議你把raw計算到發電機的功能,然後用Pool.imap_unordered()Pool.map()運行worker()生成器中的所有項目。

事情是這樣的未經測試的代碼:

def get_raw(): 
    with open("test.csv", 'rU') as f: 
     for i, line in enumerate(f): 
      if i == 0: 
       # skip header 
       continue 
      raw = line.split(',')[0][1:-1].split() 
      uselessindices=[-2,-3,-4,-5,-6] 
      counter=0 
      for ui in uselessindices: 
       del raw[ui+counter] 
       counter+=1 
      yield raw 

pool=multiprocessing.Pool(processes=NUM_WORKERS) 

pool.map(worker, get_raw()) 
pool.close() 
pool.join() 
+0

原始值有19個值,並且19值列表是工人繼續其路徑所需的。我在更改代碼時遇到的錯誤如下: pool.map(worker,get_raw()) 文件「/System/Library/Frameworks/Python.framework/Versions/2。7/lib/python2.7/multiprocessing/pool.py「,第251行,在地圖 返回self.map_async(func,iterable,chunksize).get() 文件」/System/Library/Frameworks/Python.framework/版本/ 2.7/lib/python2.7/multiprocessing/pool.py「,第567行,得到 raise self._value ValueError:需要超過19個值才能解壓 – furby559

+0

現在我修復了所有其他問題發生之前:TypeError:無法連接'str'和'int'對象 如果我將它更改爲imap_unordered而不是map,它可以正常工作,但映射仍然無法正常工作imap_unordered和async不起作用,因爲它們是寫入一個文件,它只能輸出117行,而不是預期的1000行使用這兩個。我讀了別的地方,你不應該爲文件寫入使用異步,而是s應該使用地圖,因此我試圖解決這個問題。有什麼建議? – furby559

+0

這是工人的代碼;假設一切工作完全在這裏: '高清工人(ALIST):' \t'變量1,變量2 ... variable20 =元組(ALIST)' \t'開放的( 「final.csv」, 'A')作爲MYFILE:' \t \t'finalVector = makeSingleVector(變量1,變量2,... variable3 variable20)' \t \t'用於finalVector項: \t \t \t'myfile.write( 「%S,」 %項)'' \t \t'myfile.write(「\ n」)' \t'return' – furby559

0

所以,我發現這是不扔這是工人作爲投入的子功能的不匹配數,結果裏面出現的錯誤(又名工人正在調用另一個函數dosomething(a1,a2,... a20),只給了它19個輸入)。看起來異步不會拋出關於工人內部發生的問題的錯誤輸出,這很煩人,但我現在明白了。感謝所有的幫助!