2015-12-14 106 views
4

用油滑,你可以做以下從表中產生的結果流:斯卡拉油滑:永無止境的流

val q = for (e <- events) yield e.name 
val p: DatabasePublisher[String] = db.stream(q.result) 

p.foreach { s => println(s"Event: $s") } 

,將打印所有事件在events表最後一行之後終止。

假設您可以通過某種方式通知events表中的新行何時輸入,是否可以編寫一個可以連續輸出插入事件的流?數據庫表的一種tail -f

我覺得油滑會並不支持這一點,但我想應該是可以使用阿卡流來幫助。所以如果你能從Slick Source獲得一些東西,直到它變空,然後等待一個事件來表示表中更多的數據,然後流式傳輸新的數據。可能通過使用ActorPublisher來綁定這個邏輯?

只是想知道,如果有人有這方面或任何諮詢的經驗嗎?

+0

這感覺有點像Spark的Dstream:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream –

回答

3

你是正確的約ActorPublisher :)下面是一個使用PostgreSQL,async DB driverLISTEN/NOTIFY mechanism簡單的例子:

演員:

class PostgresListener extends ActorPublisher[String] { 

    override def receive = { 
    case _ ⇒ 
     val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password") 
     val connection = new PostgreSQLConnection(configuration) 
     Await.result(connection.connect, 5.seconds) 

     connection.sendQuery(s"LISTEN $channel") 
     connection.registerNotifyListener { message ⇒ onNext(message.payload) } 
    } 
} 

服務:

def stream: Source[ServerSentEvent, Unit] = { 
    val dataPublisherRef = Props[PostgresListener] 
    val dataPublisher = ActorPublisher[String](dataPublisherRef) 

    dataPublisherRef ! "go" 

    Source(dataPublisher) 
    .map(ServerSentEvent(_)) 
    .via(WithHeartbeats(10.second)) 
} 

build.sbtlibraryDependencies

"com.github.mauricio" %% "postgresql-async"   % "0.2.18" 

Postgres的觸發應該叫select pg_notify('foo', 'payload')

據我所知,油滑不支持LISTEN

+0

我現在得到了一些工作,有點不同於你的答案,但使用ActorPublisher,所以感謝確認。另外我想你的答案是使用akka-sse項目的代碼(ServerSentEvent/WithHeartbeats)?對於這個問題,這不是必需的,但碰巧是我正在尋找的東西。所以謝謝你的指針! – Dave

+0

是啊,你說得對,這是阿卡,我從DB流的東西通過上證所UI。忘記了這個時代的拷貝代碼:)不客氣! –