2015-08-03 46 views
0

我有一個基本的生產者/消費者腳本在gevent中運行。它開始是把東西放到一個gevent.queue.Queue幾個生產函數,而取出來再排隊的一個消費函數:顯式開關()與gevent

from __future__ import print_function 

import time 

import gevent 
import gevent.queue 
import gevent.monkey 

q = gevent.queue.Queue() 

# define and spawn a consumer 
def consumer(): 
    while True: 
     item = q.get(block=True) 
     print('consumer got {}'.format(item)) 

consumer_greenlet = gevent.spawn(consumer) 

# define and spawn a few producers 
def producer(ID): 
    while True: 
     print("producer {} about to put".format(ID)) 
     q.put('something from {}'.format(ID)) 
     time.sleep(0.1) 
#  consumer_greenlet.switch()  

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)] 

# wait indefinitely 
gevent.monkey.patch_all() 
print("about to join") 
consumer_greenlet.join() 

,如果我讓GEVENT隱式處理調度它工作正常(例如,通過呼叫時間。睡眠或其他一些gevent.monkey.patch()編輯功能),但是當我切換到消費者明確(更換time.sleep與註釋掉switch調用),GEVENT提出了一個AssertionError:

Traceback (most recent call last): 
    File "/my/virtualenvs/venv/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run 
    result = self._run(*self.args, **self.kwargs) 
    File "switch_test.py", line 14, in consumer 
    item = q.get(block=True) 
    File "/my/virtualenvs/venv/lib/python2.7/site-packages/gevent/queue.py", line 201, in get 
    assert result is waiter, 'Invalid switch into Queue.get: %r' % (result,) 
AssertionError: Invalid switch into Queue.get:() 
<Greenlet at 0x7fde6fa6c870: consumer> failed with AssertionError 

我想聘請明確的開關,因爲在生產我有很多刺激用戶,gevent的調度不會爲消費者分配足夠的運行時,並且隊列變得越來越長(這很糟糕)。另外,任何有關如何配置或修改gevent的調度程序的見解都非常感謝。

這是關於Python 2.7.2,gevent 1.0.1和greenlet 0.4.5的。

+0

也許你可以看看隊列的大小,使生產停頓,如果它是具有一定規模的?這聽起來像是潛在的問題是生產者(如果不加控制的話)比消費者可以處理更多的生產方式?你如何使用gevent來做這件事,而不是線程或多重處理? –

+0

我實際上已經實現了你所建議的等待,但是它導致了生產者方面的很多等待導致系統吞吐量降低(並且這個系統確實需要快速完成工作......)。消費者當然可以處理負載,生產者加載和分析URL,所有消費者都會將結果寫入數據庫。我使用gevent是因爲它減少了線程引入的同步頭痛問題,並且由於一次生成幾十個工作進程後多處理會消耗大量內存。 – Simon

+0

我想我在這裏錯過了一些東西。如果生產者能夠生產超過消費者能夠處理的產品,那麼你唯一的選擇就是增加更多的消費者,或者減少生產者生產的數量(這就是等待的)。我不確定減少系統吞吐量的意義 - 不是限制消費者可以如何快速處理隊列的瓶頸?線程引入gevent的同步問題不是什麼? –

回答

0

似乎對我來說,顯式切換並不能很好地與隱式切換配合使用。 您已經有隱式開關發生,因爲猴子補丁I/O或gevent.queue.Queue()

該GEVENT文檔阻礙的原始greenlet方法的使用:

Being a greenlet subclass, Greenlet also has switch() and throw() methods. However, these should not be used at the application level as they can very easily lead to greenlets that are forever unscheduled. Prefer higher-level safe classes, like Event and Queue, instead.

迭代gevent.queue.Queue()或訪問隊列的get方法不執行隱式切換,有趣的是put沒有。所以你必須自己產生一個隱式線程切換。最簡單的就是撥打gevent.sleep(0)(你不必真的等待特定的時間)。

總之,你甚至不需要猴子的東西,提供你的代碼沒有阻止IO操作。

我會重寫你的代碼是這樣的:

import gevent 
import gevent.queue 

q = gevent.queue.Queue() 

# define and spawn a consumer 
def consumer(): 
    for item in q: 
     print('consumer got {}'.format(item)) 

consumer_greenlet = gevent.spawn(consumer) 

# define and spawn a few producers 
def producer(ID): 
    print('producer started', ID) 
    while True: 
     print("producer {} about to put".format(ID)) 
     q.put('something from {}'.format(ID)) 
     gevent.sleep(0) 

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)] 
# wait indefinitely 
print("about to join") 
consumer_greenlet.join() 
+0

哇,我完全忽略了你的答案,謝謝! 'put()'不隱式切換是一個有趣的觀察,我會看看產品代碼,看看是否會改變事物... – Simon