2016-01-20 88 views
6

在過去的幾天中,我一直在試圖找出使用Akka Streams和HTTP將HTTP資源下載到文件的最佳方法。如何使用Akka Streams和HTTP將HTTP資源下載到文件中?

起初,我開始與Future-Based Variant和看起來是這樣的:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

這是一種不錯,但一旦我有了更多的瞭解純阿卡流我想嘗試並使用Flow-Based Variant創建流從Source[HttpRequest]開始。起初,這完全讓我難住,直到我偶然發現了流程轉換。這最後一點更詳細:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { 
    case (responseTry, context) => (responseTry.get, context) 
} 

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match { 
    case (response, _) => response.entity.dataBytes 
} 

def downloadViaFlow(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSource). 
    runWith(FileIO.toFile(file)) 
} 

然後,我希望得到一個有點棘手,並使用Content-Disposition頭。

回過頭來看看這個基於未來變:

def destinationFile(downloadDir: File, response: HttpResponse): File = { 
    val fileName = response.header[ContentDisposition].get.value 
    val file = new File(downloadDir, fileName) 
    file.createNewFile() 
    file 
} 

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val file = destinationFile(downloadDir, response) 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

但現在我不知道如何與未來的基於變做到這一點。這是據我得到:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match { 
    case (response, _) => 
    val source = responseToByteSource(in) 
    val file = destinationFile(downloadDir, response) 
    source.map((_, file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    val sourceWithDest: Source[(ByteString, File), Unit] = source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir)) 
    sourceWithDest.runWith(???) 
} 

所以現在我有一個Source,將發出一個或多個(ByteString, File)元素爲每個File(我說的每一個File,因爲沒有理由原來Source必須是單HttpRequest)。

反正有沒有把這些和路由到動態Sink

我想這樣flatMapConcat,如:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ??? 

,這樣我可以用完成downloadViaFlow2

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = { 
    val sink = FileIO.toFile(destination, true) 
    Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right) 
} 
sourceWithDest.runWithMap { 
    case (_, file) => destToSink(file) 
} 

回答

5

該解決方案不需要flatMapConcat。如果不從文件寫入所需的任何返回值,那麼你可以使用Sink.foreach

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = { 
    val file = destinationFile(downloadDir, httpResponse) 
    httpResponse.entity.dataBytes.runWith(FileIO.toFile(file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = { 
    val request = HttpRequest(uri=uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 

    source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .runWith(Sink.foreach(writeFile(downloadDir))) 
} 

注意,Sink.foreachwriteFile函數創建Futures。因此,涉及的背壓並不多。寫入文件可能會被硬盤驅動器放慢速度,但該流將繼續產生期貨。爲了控制這一點,你可以使用Flow.mapAsyncUnordered(或Flow.mapAsync):

val parallelism = 10 

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.ignore) 

如果你希望積累的長期價值,你需要有一個Sink.fold結合的總數:

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.fold(0L)(_ + _)) 

摺疊將繼續一個運行總和並在請求源枯竭時發出最終值。

+0

嗯我希望有比這更好的方式。我不太確定這實際上是否可以正常工作。只要FileIO流已經實現,writeFile就會返回。如果響應被分塊,那麼它需要按順序寫入文件。使用'mapAsync'類似的問題。 'append'參數也需要設置。此外,似乎寫入文件的任何錯誤都不會導致外部流接收錯誤信號。 – Steiny

+1

@Steiny打破我對你的多個評論的答案:(a)正確,立即用Future寫文件返回,但是mapAsync處理這個(b)沒有解決方案可以糾正分塊源,也不是原始問題/要求的這一部分(c)只有在寫入相同文件時才需要追加(d)強制外部流在任何文件寫入失敗時失敗不是原始問題的一部分。你問:「有沒有辦法把它們引導到一個動態的水槽?」,我的回答回答**這個問題。我在您的示例代碼的上下文中編寫了我的回覆... –