2016-05-31 158 views
1

遇到一個相當令人沮喪的錯誤,當我的一個api端點被訪問時彈出。爲了給出上下文,我正在使用的應用程序Flask應用程序使用SQLAlchemy將數據存儲在一個PostgreSQL數據庫集以保存1000個連接。TimeoutError:QueuePool限制大小5溢出10達到,連接超時,超時30

用戶可以查詢所述數據的方式之一是通過端點/timeseries端點。數據以json的形式返回,該數據由查詢數據庫返回的ResultProxies組裝而成。

的希望是,通過使用多線程,我能把由/時間序列運行速度更快的視圖控制器調用的方法,因爲我們原來設定的時間太長,其將返回大量數據的查詢。

由於沒有正確清理會話,我讀過許多其他帖子,但我感覺好像我已經覆蓋了相同的問題。我編寫的代碼有什麼明顯的錯誤?

該應用程序使用AWS彈性beanstalk進行部署。

@classmethod 
def timeseries_all(cls, table_names, agg_unit, start, end, geom=None): 
    """ 
    For each candidate dataset, query the matching timeseries and push datasets with nonempty 
    timeseries into a list to convert to JSON and display. 

    :param table_names: list of tables to generate timetables for 
    :param agg_unit: a unit of time to divide up the data by (day, week, month, year) 
    :param start: starting date to limit query 
    :param end: ending date to limit query 
    :param geom: geometric constraints of the query 

    :returns: timeseries list to display 
    """ 

    threads = [] 
    timeseries_dicts = [] 

    # set up engine for use with threading 
    psql_db = create_engine(DATABASE_CONN, pool_size=10, max_overflow=-1, pool_timeout=100) 
    scoped_sessionmaker = scoped_session(sessionmaker(bind=psql_db, autoflush=True, autocommit=True)) 

    def fetch_timeseries(t_name): 
     _session = scoped_sessionmaker() 
     # retrieve MetaTable object to call timeseries from 
     table = MetaTable.get_by_dataset_name(t_name) 
     # retrieve ResultProxy from executing timeseries selection 
     rp = _session.execute(table.timeseries(agg_unit, start, end, geom)) 

     # empty results will just have a header 
     if rp.rowcount > 0: 

      timeseries = { 
       'dataset_name': t_name, 
       'items': [], 
       'count': 0 
      } 

      for row in rp.fetchall(): 
       timeseries['items'].append({'count': row.count, 'datetime': row.time_bucket.date()}) 
       timeseries['count'] += row.count 

      # load to outer storage 
      timeseries_dicts.append(timeseries) 

     # clean up session 
     rp.close() 
     scoped_sessionmaker.remove() 

    # create a new thread for every table to query 
    for name in table_names: 
     thread = threading.Thread(target=fetch_timeseries, args=(name,)) 
     threads.append(thread) 

    # start all threads 
    for thread in threads: 
     thread.start() 

    # wait for all threads to finish 
    for thread in threads: 
     thread.join() 

    # release all connections associated with this engine 
    psql_db.dispose() 

    return timeseries_dicts 
+0

嗯,是有可能,你簡單地試圖同時查詢超過15個(或10個,從你的代碼)表? – univerio

回答

3

我認爲你是以一種迂迴的方式討論這個問題的。這裏有一些關於充分利用postgres連接的建議(我在生產中使用了這種配置)。

  1. 我將使用Flask-SQLAlchemy擴展來處理到您的Postgres實例的連接。如果您查看SQLAlchemy文檔,您將看到author highly recommends使用它來處理數據庫連接生命週期,而不是滾動您自己的數據庫。
  2. 處理大量請求的更高性能的方法是將Flask應用程序放在一個wsgi服務器後面,如gunicornuwsgi。這些服務器將能夠產生你的應用程序的多個實例。然後,當有人點擊你的端點時,你的連接將在這些實例之間進行負載平衡。

    因此,舉例來說,如果你有uwsgi的配置運行5個進程,那麼你將能夠同時門把手50個DB連接(5個應用程序的每個×10個池)