2011-05-18 97 views
5

我用Python寫一個網頁刷屏,使用httplib2的和LXML(是的 - 我知道我可以使用scrapy讓我們搬過去認爲...。)刮板有大約15000頁解析成約400,000件物品。我已經獲得瞭解析項目以便即時(幾乎)運行的代碼,但從服務器下載頁面的部分仍然非常緩慢。我想通過併發來解決這個問題。但是,我無法依賴每一次需要分析的頁面。我已經嘗試過使用單個ThreadPool(比如multiprocessing.pool,但是使用線程完成 - 這應該沒問題,因爲這是一個I/O綁定進程),但我想不出優雅(或工作)的獲取方式所有線程在最後一個索引項的日期大於我們正在處理的項目時停止。現在,我正在研究一個使用ThreadPool的兩個實例的方法 - 一個用於下載每個頁面,另一個用於解析頁面。簡化的代碼示例:Python的 - 多個同時線程池

#! /usr/bin/env python2 

import httplib2 
from Queue import PriorityQueue 
from multiprocessing.pool import ThreadPool 
from lxml.html import fromstring 

pages = [x for x in range(1000)] 
page_queue = PriorityQueue(1000) 

url = "http://www.google.com" 

def get_page(page): 
    #Grabs google.com 
    h = httplib2.Http(".cache") 
    resp, content = h.request(url, "GET") 
    tree = fromstring(str(content), base_url=url) 
    page_queue.put((page, tree)) 
    print page_queue.qsize() 

def parse_page(): 
    page_num, page = page_queue.get() 
    print "Parsing page #" + str(page_num) 
    #do more stuff with the page here 
    page_queue.task_done() 

if __name__ == "__main__": 
    collect_pool = ThreadPool() 
    collect_pool.map_async(get_page, pages) 
    collect_pool.close() 

    parse_pool = ThreadPool() 
    parse_pool.apply_async(parse_page) 
    parse_pool.close() 


    parse_pool.join() 
    collect_pool.join() 
    page_queue.join() 

運行此代碼然而,沒有做什麼,我希望 - 這是火對開的線程池:一個填充隊列,另一個拉動從中解析。它開始收集池,並貫穿它,然後開始parse_pool並貫穿它(我假設,我沒有讓代碼運行足夠長的時間去parse_pool - 重點是collect_pool是所有似乎正在運行)。我相當肯定,我亂七八糟的東西了呼叫的順序加入(),但我不能爲我的生活出他們應該怎樣才能成爲英寸 我的問題基本上是這:我在這裏咆哮着正確的樹嗎?如果是這樣,我到底做錯了什麼?如果我沒有 - 你會有什麼建議是

+1

map_async - 直到它處理完所有工作爲止。 – 2011-05-18 07:12:36

+0

正式地解釋了爲什麼它不起作用,但並不一定回答我的整個問題,歸結爲「這是否是一種瘋狂的方式?」。如果答案是'不',我很接近,我只需要完善我的方法來完成它。如果是的話,我想要一些關於如何「正確地」完成這項工作的指導。 – bbenne10 2011-05-20 00:20:22

回答

6

首先,您的設計似乎是正確的,在一個較高的水平。通過httlib2模塊的同步特性證明了使用線程池來收集頁面是合理的。 (對於異步庫,一個線程就足夠了;請注意,即使使用httplib2,由於GIL,最多隻有一個收集器線程也在運行)。解析池由lxml模塊寫入C/C++(並且假設Global Interpreter Lock在頁面解析期間被釋放 - 這將在lxml文檔或代碼中檢查!)。如果後者不是真的,那麼通過擁有一個專用的解析池就不會有性能增益,因爲只有一個線程能夠獲取GIL。在這種情況下,最好使用進程池。

我不熟悉的線程池的實現,但我認爲這是類似於多模塊中的池類。在此基礎上,問題似乎是,您只爲parse_pool創建一個工作項目,並且在parse_page處理第一個頁面後,它永遠不會嘗試從此處將更多的頁面退出。其他工作項目也不會提交到該池中,因此處理停止,並且在parse_pool.close()調用之後(空)池的線程終止。

解決方法是消除page_queue。 get_page()函數應該通過調用apply_async()爲每個它收集的頁面將工作項放在parse_pool上,而不是將它們提供給page_queue。

主線程應該等到collect_queue爲空(即collect_pool.join()調用返回),然後它應該關閉parse_pool(因爲我們可以確定沒有更多的工作將被提交給解析器)。然後它應該通過調用parse_pool.join()等待parse_pool變空,然後退出。

此外,您還需要增加connect_pool中的線程數,以便同時處理更多的http請求。池中的默認線程數是CPU的數量;目前你不能發出比這麼多的請求。您可以嘗試高達數千或數千的值;觀察池的CPU消耗;它不應該接近1個CPU。

+0

如果可以的話,我會贊成。真棒回答 - 謝謝。 – bbenne10 2011-06-25 15:13:24

+0

@ bbenne10:這是一個非常好的答案。你需要接受它。 – 2011-09-16 14:46:56