2017-10-16 101 views
0

我有一個很重的用戶數據流。我想通過它的id來確定這是否是新用戶。爲了減少對數據庫的調用,我寧願在先前用戶的內存中維護一個狀態。在一個流內保持狀態

val users = mutable.set[String]() 
//init the state from db 
user = db.getAllUsersIds() 
val source: Source[User, NotUsed] 
val dbSink: Sink[User, NotUsed] //goes to db 
//if the user is added to the set it will return true 
val usersFilter = Flow[User].filter(user => users.add(user.id)) 

現在我可以創建一個圖形

source ~> usersFilter ~> dbSink 

我的問題是,可變狀態是共享的,不安全。是否有一個選項可以保持流程中的狀態?

回答

0

有兩種方法可以做到這一點。

如果您正在獲取記錄流,並且想要對該流進行重複數據刪除(因爲某些ID已經處理完畢)。你可以做

http://janschulte.com/2016/03/08/deduplicate-akka-stream/

這樣做的另一種方式是通過在您檢查ID已經存在的數據庫查詢。

val alreadyExists : Flow[User, NotUsed] = { 
    // build a cache of known ids 
    val knownIdList = ... // query database and get list of IDs 
    Flow[User].filterNot(user => knownIdList.contains(user.id)) 
} 
+0

您的建議存在主要缺陷。 這將需要很多的請求數據庫,這意味着很多的io。我寧願將它保存在內存中(我將更新我的文章) – igx

+0

至少在我的情況下,我只對數據庫進行1次調用,並將單個查詢中的所有現有ID加載到列表中。然後只需在運行時查看流中的列表(或映射)即可。 –

+0

你認爲這樣的事情會起作用嗎? 例如 'DEF alreadyExists = { VAL alreadyExistingIds = mutable.set [字符串](INIT從分貝) 流量[用戶] .filterNot(用戶=> alreadyExistingIds.add(user.id)) }' – igx