2016-06-28 87 views
0

我的akka​​流在一個元素之後停止。這是我的流:Akka流在一個元素後停止

val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
    FirehoseActor.props(
    auth = ... 
) 
) 

val ref = Flow[FirehoseActor.RawTweet] 
    .map(r => ResponseParser.parseTweet(r.payload)) 
    .map { t => println("Received: " + t); t } 
    .to(Sink.onComplete({ 
    case Success(_) => logger.info("Stream completed") 
    case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}") 
    })) 
    .runWith(firehoseSource) 

FirehoseActor連接到Twitter firehose並將消息緩衝到隊列。當演員接收Request消息時,它take S中的下一個元素,並返回它:

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    onNext(RawTweet(queue.take())) 
} 

的問題是,只有一個單一的鳴叫正在打印到控制檯。該程序不會退出或拋出任何錯誤,我已經在周圍散佈了日誌記錄,並且沒有打印任何錯誤。

我認爲接收器會繼續施加壓力來拉動元件,但似乎並不是這樣,因爲Sink.onComplete中的消息都沒有打印出來。我也嘗試使用Sink.ignore,但也只打印單個元素。演員中的日誌消息也只會打印一次。

我需要使用什麼接收器才能使其無限期地通過流?

回答

0

啊我應該在我的演員中尊重totalDemand。這修復該問題:

def receive = { 
    case Request(_) => 
    logger.info("Received request for next firehose element") 
    while (totalDemand > 0) { 
     onNext(RawTweet(queue.take())) 
    } 

我期待收到Request對於流中的每一個元素,但顯然每個流將發送Request