0

我一直在評估ZooKeeper作爲一個簡單的消息隊列,我寫了兩個非常簡單的腳本:mq feeder和mq consumer。饋線,下面,是精心推出20個職位的隊列,然後監視隊列狀態(工種被消耗):ZooKeeper和基於Python的消息隊列中的競爭條件

from kazoo.client import KazooClient 

zk = KazooClient(hosts='xxx') 
zk.start() 

for i in xrange(20): 
    zk.create("/queue/%s" % i, b"%s" % i) 

while 1: 
    print zk.get_children('/queue') 

消費者,下面,正在啓動幾次(最多3個併發進程中我的測試),並取任務列表,在它迭代找開鎖的工作,處理它(休眠隨機數秒,以模擬一些工作),一旦完成,將刪除該作業,然後刪除該鎖:

from kazoo.client import KazooClient 
from kazoo.exceptions import NodeExistsError 
from time import sleep 
import random 

zk = KazooClient(hosts='xxx') 
zk.start() 
zk.ensure_path("/locks") 
zk.ensure_path("/queue") 

while 1: 
    jobs = sorted(zk.get_children('/queue')) 
    if jobs: 
    for i in jobs: 
     print "Checking job: %s" % i 
     try: 
     zk.create("/locks/%s" % i) 
     except NodeExistsError: 
     print "Job is locked, skipping!" 
     pass 
     else: 
     print "Job is unlocked, processing." 
     sleep(random.randrange(5)) 
     zk.delete("/queue/%s" % i) 
     print "Deleted processed job, deleting the lock." 
     zk.delete("/locks/%s" % i) 
     pass 
    else: 
    print "There's no locks in the queue." 
    pass 

我看到了,我是無法追蹤的問題是,消費者的過程與退出:

Traceback (most recent call last): 
    File "zk_consumer.py", line 24, in <module> 
    zk.delete("/queue/%s" % i) 
    File "/Library/Python/2.7/site-packages/kazoo/client.py", line 1055, in delete 
    return self.delete_async(path, version).get() 
    File "/Library/Python/2.7/site-packages/kazoo/handlers/threading.py", line 107, in get 
    raise self._exception 
kazoo.exceptions.NoNodeError: ((), {}) 

而最後的進程仍然永遠檢查單的工作,留在隊列中,但始終處於鎖定狀態。很明顯,我在這裏遇到了一些邏輯錯誤,我認爲這會導致競爭狀態,但我花了一些時間在上面,而我似乎無法發現它。我在這裏做錯了什麼,或者ZooKeeper不是簡單工作隊列的可行解決方案?

回答

1

您的代碼很活潑。考慮這個序列,

T1      T2 
read queue/1  
         read queue/1 
         write lock/1 
         delete queue/1 
         delete lock/1 
write lock/1 
delete queue/1 (FAIL, no node!) 

你鎖定後,您需要再次閱讀,以確保沒有其他人刪除隊列1.

+0

你是正確的,但我也發現了有一個更正確的以實現我所需要的,也就是說,使用Kazoo的LockingQueue配方,如https://kazoo.readthedocs.org/en/latest/api/recipe/queue.html#kazoo.recipe.queue.LockingQueue – SpankMe 2013-05-04 20:38:27