在過去的幾天中,我一直在試圖找出使用Akka Streams和HTTP將HTTP資源下載到文件的最佳方法。如何使用Akka Streams和HTTP將HTTP資源下載到文件中?
起初,我開始與Future-Based Variant和看起來是這樣的:
def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
這是一種不錯,但一旦我有了更多的瞭解純阿卡流我想嘗試並使用Flow-Based Variant創建流從Source[HttpRequest]
開始。起初,這完全讓我難住,直到我偶然發現了流程轉換。這最後一點更詳細:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request,()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}
然後,我希望得到一個有點棘手,並使用Content-Disposition
頭。
回過頭來看看這個基於未來變:
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
但現在我不知道如何與未來的基於變做到這一點。這是據我得到:
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request,()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}
所以現在我有一個Source
,將發出一個或多個(ByteString, File)
元素爲每個File
(我說的每一個File
,因爲沒有理由原來Source
必須是單HttpRequest
)。
反正有沒有把這些和路由到動態Sink
?
我想這樣flatMapConcat
,如:
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???
,這樣我可以用完成downloadViaFlow2
:
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}
嗯我希望有比這更好的方式。我不太確定這實際上是否可以正常工作。只要FileIO流已經實現,writeFile就會返回。如果響應被分塊,那麼它需要按順序寫入文件。使用'mapAsync'類似的問題。 'append'參數也需要設置。此外,似乎寫入文件的任何錯誤都不會導致外部流接收錯誤信號。 – Steiny
@Steiny打破我對你的多個評論的答案:(a)正確,立即用Future寫文件返回,但是mapAsync處理這個(b)沒有解決方案可以糾正分塊源,也不是原始問題/要求的這一部分(c)只有在寫入相同文件時才需要追加(d)強制外部流在任何文件寫入失敗時失敗不是原始問題的一部分。你問:「有沒有辦法把它們引導到一個動態的水槽?」,我的回答回答**這個問題。我在您的示例代碼的上下文中編寫了我的回覆... –