2017-08-16 74 views
0

批量刪除我發佈了前幾天一個關於Python腳本佔用大量內存Understanding Python memory usageSQLAlchemy的相關表

有點反覆試驗後我設法在內存使用增長本地化問題一個執行數據庫清理的函數。我簡化架構如下(ORM):

class A(Base): 
    id = Column(Integer, primary_key=True) 
    foo = Column(String) 
    bar = Column(DateTime) 
    replication_confirmed = Column(Boolean, default=False) 

class B(Base): 
    id = Column(Integer, primary_key=True) 
    xyzzy = Column(Integer) 
    barf = Column(String) 
    replication_confirmed = Column(Boolean, default=False) 
    aref = Column(Integer, ForeignKey("A.id")) 

爲了使這個簡單,這個腳本作爲一種計算機之間複製服務器。另一個程序向此發送RabbitMQ消息,請求更新並在Json消息中更新A和B中的新行。還有幾個其他進程實際上修改了A和B,並且每次更新時都會將複製更改爲False,並且每次將數據發送到「主」服務器時,該標誌都會更改爲True。

爲了避免在這個純粹的過渡數據庫中出現混亂,我現在做了一次清理,這就是內存增長髮生的地方。我打算做的是從兩個表中刪除具有replication_confirmed = True的所有行,同時保留完整性,因爲複製可能在表之間不同步。

這是我的刪除代碼(簡化了,它確實有錯誤處理等)。它可以工作,但我認爲它現在會從上次清理後大量增長的表中加載大量內存到內存中,這會導致進程請求大量的內存,然後永遠不會釋放內存。由於有很多進程操作相同的表,因此我使用with_for_update()來鎖定受影響的行,並防止寫入進程在執行清理時修改這些表。

_todc = session.query(A).filter(A.replication_confirmed == True).\ 
      with_for_update().all() 
_aids = [i.id for i in _todc] 

_todb = session.query(B).filter(B.aref in_(_aids), 
           B.replication_confirmed == True).all() 
for _tb in _todb: 
    _session.delete(_tb) 
_remaining = session.query(B).all() 
_remaining_ids = [i.id for i in _remaining] 
_adelete = session.query(A).filter(A.replication_confirmed == True, 
            A.id.notin_(_remaining_ids)).all() 
for _ta in _adelete: 
    session.delete(_ta) 
session.commit() 

這一切都有效,但必須有更好的方式來做到這一點。由於複製過程的性質,表格在複製方面可能不完全同步。來自B的行可能已被複制,而來自A的被引用的行仍然處於等待狀態。或相反亦然。

這樣做會更有效嗎?這現在讀取從受影響的表到內存的所有內容,但是我可能可以在數據庫級別完成所有操作,而無需處理內存中的數據。我只是不知道該怎麼做。

任何想法?這是Python 2.7,Sqlalchemy和postgresql9.5。

回答

1

如果我理解正確的話,那麼2刪除與聯接應該做的:依次

In [12]: session.query(A).\ 
    ...:  filter(A.replication_confirmed, 
    ...:   ~session.query(B). 
    ...:    filter(B.aref == A.id). 
    ...:    exists()).\ 
    ...:  delete(synchronize_session=False) 

In [11]: session.query(B).\ 
    ...:  filter(B.replication_confirmed, 
    ...:   session.query(A). 
    ...:    filter(A.replication_confirmed, 
    ...:      A.id == B.aref). 
    ...:    exists()).\ 
    ...:  delete(synchronize_session=False) 

雖然在這裏一排沒有被鎖定,並根據您的隔離級別可能會在2個查詢之間改變。

+0

是否有可能實現鎖? – Hannu

+0

但這樣的查詢似乎工作。我會擔心有一天會鎖定數據庫。無論如何,連續清理可能會收集垃圾,這可能並不重要。謝謝。 – Hannu