2012-07-26 57 views
3

我可能會問一個非常基本的問題,但我真的不知道如何在python中創建一個簡單的並行應用程序。 我正在16核心的機器上運行我的腳本,我想高效地使用它們。我有16個巨大的文件要讀取,我希望每個CPU讀取一個文件,然後合併結果。 這裏,我給的,我想怎麼做一個簡單的例子:Python,閱讀許多文件併合並結果

parameter1_glob=[] 
    parameter2_glob[] 


    do cpu in arange(0,16): 
     parameter1,parameter2=loadtxt('file'+str(cpu)+'.dat',unpack=True) 

     parameter1_glob.append(parameter1) 
     parameter2_glob.append(parameter2) 

我認爲multiprocessing模塊可以幫助,但我不知道如何將它應用到了我想做的事情。

+0

[您是否聽說過Python的GIL](http://stackoverflow.com/questions/990102/python-global-interpreter-lock-gil-workaround-on-multi-core-systems-using-task) – tkone 2012-07-26 14:58:11

+2

有沒有意義使用多個線程......您的應用程序將是磁盤綁定的,而不是CPU綁定的。 – 2012-07-26 14:59:16

+2

@tkone:'multiprocessing'通過使用單獨的解釋器來避免GIL,儘管它不會使磁盤更快。 – geoffspear 2012-07-26 15:00:10

回答

1

要逐行合併嗎?有些協程對於I/O綁定的應用程序比傳統的多任務更有趣。您可以鏈接生成器和協程以進行各種路由,合併和廣播。用這個nice presentation by David Beazley吹出你的想法。

您可以使用協同程序作爲一個水槽(未經測試,請參閱dabeaz例子):

# A sink that just prints the lines 
@coroutine 
def printer(): 
    while True: 
     line = (yield) 
     print line, 

sources = [ 
    open('file1'), 
    open('file2'), 
    open('file3'), 
    open('file4'), 
    open('file5'), 
    open('file6'), 
    open('file7'), 
] 

output = printer() 
while sources: 
    for source in sources: 
     line = source.next() 
     if not line: # EOF 
      sources.remove(source) 
      source.close() 
      continue 
     output.send(line) 
2

我同意Colin Dunklau在他的評論中所說的,這個過程會在讀寫這些文件時遇到瓶頸,對CPU的要求很少。即使你有17個專用驅動器,即使只有一個內核也不會超出。另外,雖然我認識到這與您的實際問題相切,但您可能會遇到這些「巨大」文件的內存限制 - 將16個文件作爲數組加載到內存中,然後將它們組合到另一個文件中幾乎肯定會佔用更多的內存你有。

你可能會發現更好的結果,看看shell腳本這個問題。特別是,GNU sort使用內存有效的合併排序來快速排序一個或多個文件 - 比Python中或大多數其他語言中最精心編寫的應用程序的速度要快得多。

我會建議避免任何類型的多線程工作,它將大大增加複雜性,並且利益最小。確保一次只保留內存中的少量文件,否則會很快耗盡。無論如何,您絕對想要在兩個獨立的磁盤上運行讀寫。與同時讀取和寫入同一磁盤相關的速度減慢非常痛苦。

0

假設每個文件的結果是短小的,你可以用我的包jug做到這一點:

from jug import TaskGenerator 
loadtxt = TaskGenerator(loadtxt) 

parameter1_glob=[] 
parameter2_glob[] 

@TaskGenerator 
def write_parameter(oname, ps): 
    with open(oname, 'w') as output: 
     for p in ps: 
      print >>output, p 

parameter1_glob = [] 
parameter2_glob = [] 

for cpu in arange(0,16): 
    ps = loadtxt('file'+str(cpu)+'.dat',unpack=True) 
    parameter1_glob.append(ps[0]) 
    parameter2_glob.append(ps[1]) 

write_parameter('output1.txt', parameter1_glob) 
write_parameter('output2.txt', parameter2_glob) 

現在,您可以執行多個jug execute職位。