2

我想使用我的CPU的單核處理文件。但我想這是不足以使用一個單一的核心。相反,如果我有權訪問我的系統的多個核心,那麼我可以使該過程更好更快地運行。使用多線程處理文件使用python3

但不幸的是,我知道只使用單核處理文件。下面是我做的:

data = open('datafile','r',encoding='ascii',errors='ignore') 
for line in data.readlines(): 
    splitted = line.lower().strip().split() 
    check = process(splitted[0],splitted[1]) 
    if check == '': 
     pass 
data.close() 

我想知道我該如何使用CPU進行處理TEH process()而單獨服用line並根據需要獲取輸出的完整能力?即使在處理過程中我也可以避免線程的死鎖狀態,因爲這可能會對進程輸出造成危險。

請與我分享你的看法。

+0

什麼是死鎖?死鎖狀態需要至少一個鎖,你知道嗎?現在,由於Python有這個叫做GIL的東西,利用多核心的唯一方法是使用進程而不是線程。現在,並行磁盤io可能會也可能不會增加性能(取決於您擁有的磁盤),所以我建議使用'multiprocesing.Pool'並將文件的「塊」從主進程發送到並行處理。 – freakish

+0

@freakish將文件劃分爲塊可能會丟失我不想要的數據,因爲維護完整數據有點重要。 –

+0

爲什麼它會丟失數據?您只需閱讀一行行,並將每行發送給子進程。這裏沒有數據丟失。 – freakish

回答

1

首先:您需要多個進程來利用多個內核。不是線程。這是GIL的限制。

現在這裏有一個如何你可以用multiprocessing.Pool實現它的一個例子:

from multiprocessing import Pool, cpu_count 

def process(arg1, arg2): 
    ... 

workers_count = 2*cpu_count()+1 # or whatever you need 
pool = Pool(processes=workers_count) 

with open('datafile','r',encoding='ascii',errors='ignore') as fo: 
    buffer = [] 
    for line in fo: 
     splitted = line.lower().strip().split() 
     buffer.append((splitted[0], splitted[1])) 
     if len(buffer) == workers_count: 
      results = pool.map(process, buffer) 
      buffer = [] 
      # do something with results 
    if buffer: 
     results = pool.map(process, buffer) 
     # do something with results again 

所以它做什麼它逐行讀取文件中的行,一旦收集足夠的數據,將其發送到一個多進程池,並等待用於並行處理。請注意,除非您有SSD,否則並行運行磁盤io只會降低性能(並行逐行讀取並不重要)。

但是你必須意識到的是,由於使用了多個進程,你不能在它們之間共享內存,即process函數不應該讀/寫全局變量。

+0

是的,我有SSD。如果我使用Global變量來存儲臨時數據,會發生什麼情況,因爲在通過'process()'處理任何數據時,它很容易使用。 –

+0

@JafferWilson如果您使用的是SSD,那麼您可能需要使用並行讀取,即通過在池中調用'.read()'來玩。然而,這將很難實現,因爲每個工作人員都必須知道從哪裏開始閱讀文件以及應該閱讀多少行。這似乎並不容易實現。 – freakish

+0

@JafferWilson對於存儲臨時數據,你總是可以在'process'函數中有一個局部變量,對吧?如果例如您有一個全局計數器,在每個「進程」調用之後遞增,則會出現問題。在對'process'的調用之間共享狀態不適用於多個進程。 – freakish