2015-11-04 102 views
5

我的任務是從給定的url列表下載1M +圖像。推薦的方式是什麼?使用gevent下載圖像

看完Greenlet Vs. Threads之後,我看了一下gevent,但是我無法可靠地運行。我玩了100個網站的測試集,有時在1.5秒內完成,但有時需要超過30秒,這很奇怪,因爲每個請求的超時時間是0.1,所以它不應該超過10秒。

見下面的代碼

我也看了成grequests但他們似乎有issues with exception handling.

我的「要求」是,我可以

  • 檢查,同時下載(超時引起的錯誤,損壞的圖像...),
  • 監視處理圖像數量的進度和
  • 儘可能快。
from gevent import monkey; monkey.patch_all() 
from time import time 
import requests 
from PIL import Image 
import cStringIO 
import gevent.hub 
POOL_SIZE = 300 


def download_image_wrapper(task): 
    return download_image(task[0], task[1]) 

def download_image(image_url, download_path): 
    raw_binary_request = requests.get(image_url, timeout=0.1).content 
    image = Image.open(cStringIO.StringIO(raw_binary_request)) 
    image.save(download_path) 

def download_images_gevent_spawn(list_of_image_urls, base_folder): 
    download_paths = ['/'.join([base_folder, url.split('/')[-1]]) 
         for url in list_of_image_urls] 
    parameters = [[image_url, download_path] for image_url, download_path in 
      zip(list_of_image_urls, download_paths)] 
    tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters] 
    for task in tasks: 
     try: 
      task.get() 
     except Exception: 
      print 'x', 
      continue 
     print '.', 

test_urls = # list of 100 urls 

t1 = time() 
download_images_gevent_spawn(test_urls, 'download_temp') 
print time() - t1 
+2

您是否必須使用線程?如果您可以使用多個進程,您可以使用'multiprocessing.Pool'完成此操作,您可能會發現它也更簡單。我使用'pool.map(download_image,url_list)'和'pool.join()'做類似的事情。 – foz

+1

@foz,謝謝,但我也嘗試過'multiprocessing.Pool'類似的問題。還有人告訴我,'multiprocessing'不適合這種類型的任務:http://stackoverflow.com/a/27016937/380038 – Framester

+0

有趣!我可以看到多處理效率/可擴展性不高,但我不明白爲什麼它不適用於適度的池大小(32倍)。希望你能得到一個很好的答案,因爲我想我也會學到一些東西! – foz

回答

-1

我會建議支付Grablib http://grablib.org/

關注它是基於pycurl和multicurl的asynchronic解析器。 另外它嘗試自動解決網絡錯誤(如再次嘗試,如果超時等)。

我相信Grab:Spider模塊可以解決99%的問題。 http://docs.grablib.org/en/latest/index.html#spider-toc

+0

謝謝。你能詳細說明一下grablib有什麼不同,或者爲什麼你有一個想法,爲什麼它比我的方法更好? – Framester

+0

Ooops,你有直接的圖像網址?如果是,那麼對不起,你仍然可以使用抓鬥或任何你有的東西。 Grablib是抓取和解析的理想選擇。但是,您也可以將它用於圖像下載,Grablib(特別是Grab:Spider模塊)會重試網絡錯誤> 400和!= 404的任務。重試次數可以手動設置。它具有日誌記錄和進程監視。 –

1

我認爲這將是更好地堅持與urllib2的,舉例https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

試試這個代碼,我想這是你在問什麼。

import gevent 
from gevent import monkey 

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets 
monkey.patch_all() 

import sys 

urls = sorted(chloya_files) 

if sys.version_info[0] == 3: 
    from urllib.request import urlopen 
else: 
    from urllib2 import urlopen 


def download_file(url): 
    data = urlopen(url).read() 
    img_name = url.split('/')[-1] 
    with open('c:/temp/img/'+img_name, 'wb') as f: 
     f.write(data) 
    return True 


from time import time 

t1 = time() 
tasks = [gevent.spawn(download_file, url) for url in urls] 
gevent.joinall(tasks, timeout = 12.0) 
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks)) 
print time() - t1 
+0

謝謝,我用'urlopen(...,timeout = 0.1)'試過這個代碼,但是它仍然花費了100個1000個URL,這表明它沒有並行執行請求。 – Framester

+0

也許是網絡問題?在我的測試中,來自捷克網站的139個文件花了10.1秒。我也對並行性有所懷疑,但現在我認爲我受限於遠程web服務器,而不是gevent-urlib2 – Ingaz

1

有使用geventRequestssimple-requests

使用RequestsSession爲HTTP持久連接一個簡單的解決方案。由於gevent使得Requests異步,我認爲在HTTP請求中不需要timeout

默認情況下,高速緩存requests.Session TCP連接(pool_connections)10臺主機和限制每緩存TCP連接(pool_maxsize)10個併發的HTTP請求。應該通過明確地創建一個http適配器來調整默認配置以適應需要。

session = requests.Session() 
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) 
session.mount('http://', http_adapter) 

打破作爲生產者 - 消費者的任務。圖像下載是生產者任務,圖像處理是消費者任務。

如果圖像處理庫PIL不是異步的,則它可能會阻塞生產者協同程序。如果是這樣,消費者池可以是gevent.threadpool.ThreadPool。 F.E.

from gevent.threadpool import ThreadPool 
consumer = ThreadPool(POOL_SIZE) 

這是如何完成它的概述。我沒有測試代碼。

from gevent import monkey; monkey.patch_all() 
from time import time 
import requests 
from PIL import Image 
from io import BytesIO 
import os 
from urlparse import urlparse 
from gevent.pool import Pool 

def download(url): 
    try: 
     response = session.get(url) 
    except Exception as e: 
     print(e) 
    else: 
     if response.status_code == requests.codes.ok: 
      file_name = urlparse(url).path.rsplit('/',1)[-1] 
      return (response.content,file_name) 
     response.raise_for_status() 

def process(img): 
    if img is None: 
     return None 
    img, name = img 
    img = Image.open(BytesIO(img)) 
    path = os.path.join(base_folder, name) 
    try: 
     img.save(path) 
    except Exception as e: 
     print(e) 
    else: 
     return True 

def run(urls):   
    consumer.map(process, producer.imap_unordered(download, urls)) 

if __name__ == '__main__': 
     POOL_SIZE = 300 
     producer = Pool(POOL_SIZE) 
     consumer = Pool(POOL_SIZE) 

     session = requests.Session() 
     http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) 
     session.mount('http://', http_adapter) 

     test_urls = # list of 100 urls 
     base_folder = 'download_temp' 
     t1 = time() 
     run(test_urls) 
     print time() - t1 
+0

感謝您的建議。我在我的網站上嘗試了您的代碼,但這個1k網址需要超過200秒。一個問題可能是,他們中的大多數都指向一個域,但其中很多域也指向不同的域。 – Framester

+0

你認爲應該花多少時間?文件大小,客戶端帶寬和服務器負載都在時間安排中發揮作用。 –

+0

我已經更新了我的回答,建議消費者使用ThreadPool。如果圖像處理是cpu-bound,則應該使用'multiprocessing.Pool'。 –