2017-09-05 78 views
0

我在我的Django項目中使用了一個Celery任務,並帶有鎖,如this article中所述。它工作的很好,但我的任務創建一個對象,我不希望在數據庫中提交對象之前釋放鎖。我如何才能更改此上下文管理器以等待任務中的對象提交?發佈交易提交時的芹菜鎖

@contextmanager 
def lock(lock_id, oid, expire=600): 
    timeout_at = monotonic() + expire - 3 
    status = cache.add(lock_id, oid, expire) 
    try: 
     yield status 
    finally: 
     if monotonic() < timeout_at: 
      cache.delete(lock_id) 

@celery.task(bind=True, ignore_result=True) 
def my_task(self, object_id): 
    with lock('my_task.{}'.format(object_id), self.app.oid) as acquired, transaction.atomic(): 
     if not acquired: 
      self.retry(countdown=1) 
     def on_commit(): 
      # release the lock only in this moment 
      pass 
     transaction.on_commit(on_commit) 
     MyModel.objects.create(object_id=object_id) 

回答

1

此上下文管理器創建一個鎖並在事務中封裝正文。只有在提交事務或發生異常(除celery.exceptions.Retry)之外,它纔會釋放鎖。

如芹菜文檔指出:

爲了使其正常工作,你需要使用一個緩存後端其中。新增操作是原子。已知memcached可以很好地用於此目的。

from celery.exceptions import Retry 
from contextlib import contextmanager 
from time import monotonic 
from django.core.cache import cache 
from django.db import transaction 


@contextmanager 
def lock_transaction(lock_id, oid, expire=600): 
    status = cache.add(lock_id, oid, expire) 
    timeout_at = monotonic() + expire - 3 
    is_retry = False 

    def on_commit(): 
     if not is_retry and monotonic() < timeout_at: 
      cache.delete(lock_id) 

    with transaction.atomic(): 
     transaction.on_commit(on_commit) 
     try: 
      yield status 
     except Retry as e: 
      is_retry = True 
     except: 
      if monotonic() < timeout_at: 
       cache.delete(lock_id) 
      raise 

使用的一個示例:

@celery.task(bind=True, ignore_result=True, max_retries=90, time_limit=60) 
def create_or_add_counter_task(self, object_id): 
    with lock_transaction('object_id.{}'.format(object_id), self.app.oid) as acquired: 
     if not acquired: 
      self.retry(countdown=1) 
     try: 
      obj = MyModel.objects.get(object_id=object_id) 
      obj.counter += 1 
      obj.save() 
     except MyModel.DoesNotExist: 
      MyModel.objects.create(object_id=object_id)