2017-06-08 77 views
0

有兩個表TableATableB如何一些記錄從表A複製到表B具有光滑的流媒體和阿卡串流

我需要一些記錄複製從TableATableB。我用slick-3.0,並使用以下方法:

import akka.stream._ 
import akka.stream.scaladsl._ 
... 

//{{ READ DATA FROM TABLE A 
val q = TableA.filter(somePredicate).result 
val source = Source.fromPublisher { 
     db.stream(q.result).mapResult { r => 
     val record: RecordA = someTransformation(r) 
     record 
     } 
    }.grouped(50) // grouping because I want to write records in batch mode 
//}} 

//{{ WRITE DATA TO TABLE B 
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] => 
     //TODO how to write batch to TableB asynchronously? 
     val insertAction = TableB ++= batch // insert batch to table 
     val fInsert: Future[_] = db.run(insertAction) 
     Await.result(fInsert, ...)   // #1 this works only with blocking 
}) 
//}} 

但我面臨着一個問題 - 如何寫批處理TableB異步(參見TODO)。現在上面的代碼只適用於內部未來(參見#1評論)。有異步實現該任務的正確方法嗎?

+0

如果你不阻止內在的未來會發生什麼? – thwiegan

+0

@thwiegan,如果我不阻止內在的未來並返回它然後它不會完成 –

+1

這似乎是你的用例:https://stackoverflow.com/questions/36400152/how-are-reactive-streams -used-in-slick-for-inserted-data我沒有看到與你的例子 – thwiegan

回答

2

使用mapAsync它期望返回未來,公開「展開」結果在下一階段。

source.mapAsync(4){batch: Seq[RecordA] => 
     val insertAction = TableB ++= batch // insert batch to table 
     db.run(insertAction) 
}).to(Sink.ignore).run