2013-05-02 84 views
2

我有一個金字塔應用程序叫mainsite「ResourceClosedError:交易已關閉」錯誤與芹菜拍和sqlalchemy +金字塔應用程序

該網站主要通過從視圖中啓動的線程以相當異步的方式工作來執行後端操作。

它使用sqlalchemy連接到mysql並使用ZopeTransactionExtension進行會話管理。

到目前爲止,應用程序運行良好。

我需要在其上運行定期作業,它需要使用從視圖中啓動的一些相同的異步功能。

我使用了apscheduler,但遇到了問題。所以我想將芹菜節拍作爲一個單獨的過程,將mainapp作爲一個庫來處理,並導入要使用的函數。

我芹菜的配置是這樣的:

from datetime import timedelta 
from api.apiconst import RERUN_CHECK_INTERVAL, AUTOMATION_CHECK_INTERVAL, \ 
    AUTH_DELETE_TIME 

BROKER_URL = 'sqla+mysql://em:[email protected]/edgem' 
CELERY_RESULT_BACKEND = "database" 
CELERY_RESULT_DBURI = 'mysql://em:[email protected]/edgem' 

CELERYBEAT_SCHEDULE = { 
    'rerun': { 
     'task': 'tasks.rerun_scheduler', 
     'schedule': timedelta(seconds=RERUN_CHECK_INTERVAL) 
    }, 
    'automate': { 
     'task': 'tasks.automation_scheduler', 
     'schedule': timedelta(seconds=20) 
    }, 
    'remove-tokens': { 
     'task': 'tasks.token_remover_scheduler', 
     'schedule': timedelta(seconds=2 * 24 * 3600) 
    }, 
} 

CELERY_TIMEZONE = 'UTC' 

的tasks.py是

from celery import Celery 
celery = Celery('tasks') 
celery.config_from_object('celeryconfig') 


@celery.task 
def rerun_scheduler(): 
    from mainsite.task import check_update_rerun_tasks 
    check_update_rerun_tasks() 


@celery.task 
def automation_scheduler(): 
    from mainsite.task import automate 
    automate() 


@celery.task 
def token_remover_scheduler(): 
    from mainsite.auth_service import delete_old_tokens 
    delete_old_tokens() 

記住,所有上述功能立即返回,但如果需要的話

推出主題線程通過做transaction.commit() after session.add(object)將對象保存到db中。

問題是,整個事情只有約30分鐘就像寶石一樣工作。之後ResourceClosedError: The transaction is closed錯誤開始發生在有transaction.commit()的任何地方。我不確定是什麼問題,我需要幫助解決問題。

我在導入任務的原因是爲了擺脫這個錯誤。認爲每次需要運行任務時都需要導入,這是一個好主意,我可能每次都會得到一個新的交易,但看起來情況並非如此。

回答

9

根據我的經驗,嘗試重新使用配置爲與金字塔(與ZopeTransactionExtension等)與芹菜工人一起使用的會話會導致一個可怕的難以調試的混亂。

ZopeTransactionExtension將SQLAlchemy會話綁定到Pyramid的請求 - 響應循環 - 事務啓動並提交或自動回滾,通常不應該在代碼中使用transaction.commit() - 如果一切正常,ZTE將提交一切,如果你的代碼引發異常,你的交易將被回滾。

使用Celery,您需要手動管理SQLAlchemy會話,中興通訊阻止您這麼做,因此您需要配置DBSession以不同方式進行配置。

一些簡單的像這樣的工作:

DBSession = None 

def set_dbsession(session): 
    global DBSession 
    if DBSession is not None: 
     raise AttributeError("DBSession has been already set to %s!" % DBSession) 

    DBSession = session 

然後從金字塔啓動代碼你做

def main(global_config, **settings): 
    ... 
    set_dbsession(scoped_session(sessionmaker(extension=ZopeTransactionExtension()))) 

芹菜這是一個有點棘手 - 我結束了創建芹菜自定義啓動腳本,我在其中配置會話。

workersetup.py

entry_points=""" 
    # -*- Entry points: -*- 
    [console_scripts] 
    custom_celery = worker.celeryd:start_celery 
    custom_celerybeat = worker.celeryd:start_celerybeat 
    """, 
) 

worker/celeryd.py

def initialize_async_session(db_string, db_echo): 

    import sqlalchemy as sa 
    from db import Base, set_dbsession 

    session = sa.orm.scoped_session(sa.orm.sessionmaker(autoflush=True, autocommit=True)) 
    engine = sa.create_engine(db_string, echo=db_echo) 
    session.configure(bind=engine) 

    set_dbsession(session) 
    Base.metadata.bind = engine 


def start_celery(): 
    initialize_async_session(DB_STRING, DB_ECHO) 
    import celery.bin.celeryd 
    celery.bin.celeryd.main() 

你用一般的方法「從視圖中被啓動的線程來進行後臺操作」如果你打算將應用程序部署到生產服務器上,對我來說有點危險 - 一個Web服務器通常會回收,殺死或創建新的「工作人員」,因此通常不會保證每個特定的進程能夠存活在當前的請求 - 響應週期。我從來沒有嘗試過這樣做,所以也許你會沒事:)

+0

嘿。我無法爲這個人感謝你。在我想到這裏問問之前,我坐了幾天。我會試試這個,解決方案好像可能會起作用。 我會試試這個,讓你知道它是怎麼回事。 – 2013-05-03 05:27:56