2012-03-08 108 views
21

我無法正確理解如何正確打開和關閉數據庫會話,正如我通過sqlalchemy文檔理解的那樣,如果我使用scoped_session構造我的Session對象,然後使用返回的Session對象創建會話,它是線程安全的,所以基本上每個線程都會得到它自己的會話,並且不會有問題。現在下面的例子工作,我把它放在一個無限循環中,看它是否正確地關閉了會話,並且如果我正確地監視它(在mysql中執行「SHOW PROCESSLIST;」),連接只會繼續增長,它不會關閉它們,儘管我使用了session.close(),甚至在每次運行結束時刪除了scoped_session對象。我究竟做錯了什麼?我在更大的應用程序中的目標是使用所需的最少數量的數據庫連接,因爲我目前的工作實現在每個需要的方法中創建一個新的會話,並在返回之前關閉它,這似乎效率低下。SQLAlchemy在多線程應用程序中正確處理會話

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

回答

36

你應該只調用create_enginescoped_session每 進程(每個數據庫)一次。每個人都將獲得自己的連接池或會話 (分別),因此您要確保只創建一個池。只需將其設置爲全局模塊級別即可。如果你需要比這更preciesly管理您的會話,你可能不應該使用scoped_session

另一個變化要提出的是直接使用DBSession,就好像是一個 會話。調用scoped_session上的會話方法將透明地 創建線程本地會話(如果需要),並將方法調用轉發到 會話。

另一件事要注意的是連接池,這是 5默認的 pool_size 。對於許多應用程序,這很好,但如果你正在創建 大量的線程,則可能需要調整這個參數

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

謝謝你的信息,這是非常有益的確實。國王問候! – andrean 2012-03-09 09:01:15