2011-01-22 95 views
0

所以我們來想象一個我們想要使用cElementTree.iterparse進行壓縮的大型XML文檔(文件大小> 100 MB)。分割和征服etree.iterparse使用多處理

但英特爾承諾給我們的所有內核都是值得的,我們如何讓它們使用?這裏就是我想要的:

from itertools import islice 
from xml.etree import ElementTree as etree 

tree_iter = etree.iterparse(open("large_file.xml", encoding="utf-8")) 

first = islice(tree_iter, 0, 10000) 
second = islice(tree_iter, 10000) 

parse_first() 
parse_second() 

好像有幾個問題這一點,並非最不重要的是,通過iterparse()返回的迭代器似乎抵擋切片。

有沒有什麼辦法,以大型XML文檔的解析工作量劃分爲兩個或四個獨立的任務(沒有將整個文檔加載到內存中?其目的是然後在不同的處理器上執行的任務。

回答

0

我。認爲你需要爲這個任務隊列良好的線程池,我發現(和使用)這個非常好(這是在python3,但應該不會太難轉換到2.x):

# http://code.activestate.com/recipes/577187-python-thread-pool/ 

from queue import Queue 
from threading import Thread 

class Worker(Thread): 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 

    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: func(*args, **kargs) 
      except Exception as exception: print(exception) 
      self.tasks.task_done() 

class ThreadPool: 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for _ in range(num_threads): Worker(self.tasks) 

    def add_task(self, func, *args, **kargs): 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     self.tasks.join() 

現在你可以在iterparse上運行循環,讓線程池爲你分配工作。使用它很簡單:

def executetask(arg): 
    print(arg) 

workers = threadpool.ThreadPool(4) # 4 is the number of threads 
for i in range(100): workers.add_task(executetask, i) 

workers.wait_completion() # not needed, only if you need to be certain all work is done before continuing 
+0

所以我猜我然後調用workers.add_task與解析每個單獨的元素的函數?對於etree.parseiter()中的elem:workers.add_task(parseElem,elem)?問題是由於解析相對簡單,這導致沒有性能增益。我需要的是將etree.parseiter()分解爲可管理的塊:理想情況下,在迭代中的100.000元素中,將25.000分配給池中的每個線程。那可能嗎? – 2011-01-23 00:28:16