2017-01-22 56 views
2

我正在使用Alpakka-FTP,但也許我正在尋找一個通用的akka​​-stream模式。該FTP連接器可以列出文件或檢索它們:理想Akka Streams,源項目作爲另一個來源?

def ls(host: String): Source[FtpFile, NotUsed] 
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]] 

,我想創建這樣一個流:

LIST 
    .FETCH_ITEM 
    .FOREACH(do something) 

但我無法創建具有兩個功能,例如流我上面寫過。我覺得我應該能到那裏使用Flow,像

Ftp.ls 
    .via(some flow that uses the Ftp.fromPath above) 
    .runWith(Sink.foreach(do something)) 

這是可能的,上面只有lsfromPath功能給?

編輯:

我能解決它使用一個演員和mapAsync,但我還是覺得應該更簡單。

class Downloader extends Actor { 
    override def receive = { 
    case ftpFile: FtpFile => 
     Ftp.fromPath(Paths.get(ftpFile.path), settings) 
     .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right) 
     .run() pipeTo sender 
    } 
} 

val downloader = as.actorOf(Props(new Downloader)) 

Ftp.ls("test_path", settings) 
    .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult]) 
    .runWith(Sink.foreach(res => println("got it!" + res))) 

回答

0

您應該可以使用flatMapConcat達到此目的。您的具體示例可以重寫爲

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile => 
    Ftp.fromPath(Paths.get(ftpFile.path), settings) 
}.runWith(FileIO.toPath(Paths.get("testHDF.txt"))) 

文檔here

相關問題