2017-07-17 166 views
1

我過去成功地使用了Akka Streams,但是我目前很難理解爲什麼客戶端Websocket Streams在Akka-HTTP中被定義並且作爲它顯示在documentation中。Akka HTTP客戶端websocket流的定義

由於WebSocket連接允許全雙工通信,我期望這種連接由Akka HTTP中的兩個單獨的流表示,一個用於傳入流量,另一個用於傳出流量。並且實際上,文檔指出以下幾點:

甲的WebSocket由消息的兩個流的[...]

它進一步指出,傳入消息由Sink和傳出消息由a表示Source。這是我的第一個困惑 - 當使用兩個獨立的流時,你會期望總共處理兩個源和兩個接收器,而不是每種類型中的一個。目前,我的猜測是,傳入流的來源以及傳出流的接收器對於開發人員來說並不是很有用,因此只是「隱藏起來」。

但是,當將所有東西連接在一起時,它確實令人困惑(請參閱上面鏈接的文檔)。

使用singleWebSocketRequest時,有問題的部分:

val flow: Flow[Message, Message, Future[Done]] = 
     Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left) 

或者用webSocketClientFlow當相同的部分:

val (upgradeResponse, closed) = 
     outgoing 
     .viaMat(webSocketFlow)(Keep.right) 
     .toMat(incoming)(Keep.both) 
     .run() 

這違揹我的電流流的工作流程的理解。

  • 我爲什麼要到Source傳出消息和Sink傳入的消息結合?上面的代碼看起來像我發送消息給自己而不是服務器。
  • 此外,什麼是Flow[Message, Message, ...]的語義?將傳出消息轉換爲傳入消息似乎沒有意義。
  • 不應該有兩個流而不是一個?

任何幫助提高我的理解是讚賞,謝謝。

編輯:

我必須使用SourceSink並通過WebSocket的發送數據沒有問題,我只是想知道爲什麼階段的佈線這樣做。

回答

2

WebSocket包含兩個獨立的流,它們只是(可能)不在同一個JVM上。

你有兩個對等體進行通信,一個是服務器,另一個是客戶機,但從建立的WebSocket連接的角度來看,差異不再重要。一個數據流是對等體1向對等體2發送消息,另一個流是對等體2向對等體1發送消息,然後在這兩個對等體之間存在網絡邊界。如果您一次查看一個對等點,則您有對等點1接收來自對等點2的消息,而在第二個流中,對等點1正在向對等點2發送消息。

每個對等體都有一個接收部分的接收器和一個發送部分的源。實際上,您確實有兩個源和兩個共享池,但不是同一個ActorSystem(假設爲了解釋兩個對等實現在Akka HTTP中)。來自對等體1的源連接到對等體2的宿並且對等體2的源連接到對等體1的接收器。

因此,您編寫了一個接收器來描述如何處理第一個流中的傳入消息以及描述如何通過第二個流發送消息的源。通常,您希望根據您收到的消息生成消息,因此您可以將這兩者連接在一起,並通過不同的流程來路由消息,這些流程描述如何響應和傳入消息並生成任意數量的傳出消息。 Flow[Message, Message, _]並不意味着您將傳出消息轉換爲傳入消息,而是將傳入消息轉換爲傳出消息。

webSocketFlow是一個典型的異步邊界,代表另一個對等體的流。它通過將傳出消息發送到另一個對等體併發射其他對等體發送的內容來「轉換」傳出消息。

val flow: Flow[Message, Message, Future[Done]] = 
     Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left) 

這種流動是兩個流的同行的一半:

  1. [message from other peer]連接到printSink
  2. helloSource連接到[message to the other peer]

有傳入郵件和傳出之間沒有關係消息,您只需打印收到的所有內容併發送一個「hello world!」。實際上,由於源在一條消息之後完成,所以WebSocket連接也關閉,但是如果用例如Source.repeat替換源,則會不斷髮送(洪泛,真的)「hello,world!」。無論傳入消息的速率如何。

val (upgradeResponse, closed) = 
     outgoing 
     .viaMat(webSocketFlow)(Keep.right) 
     .toMat(incoming)(Keep.both) 
     .run() 

在這裏,您採取一切從outgoing來了,這是你要發送的郵件,將其路由通過webSocketFlow,通過與其他同行交流「轉變」的消息,並生成每一個接收到的消息到incoming。通常你有一個有線協議,你可以將你的case class/pojo/dto消息編碼並解碼成Wire格式。

val encode: Flow[T, Message, _] = ??? 
val decode: Flow[Message, T, _] = ??? 

val upgradeResponse = outgoing 
    .via(encode) 
    .viaMat(webSocketFlow)(Keep.right) 
    .via(decode) 
    .to(incoming) 
    .run() 

或者你能想象某種聊天服務器(啊,的WebSockets和聊天室),其中廣播和和一些客戶的融合信息。這應該採取的任何消息從任何客戶端,並將其發送給每個客戶端(演示只,未經測試,可能不是你想要什麼實際的聊天服務器):

val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ??? 
val chatClientSenders: Seq[Source[Message, NotUsed]] = ??? 

// each of those receivers/senders could be paired in their own websocket compatible flow 
val chatSockets: Seq[Flow[Message, Message, NotUsed]] = 
    (chatClientReceivers, chatClientSenders).zipped.map(
    (outgoingSendToClient, incomingFromClient) => 
     Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient)) 

val toClients: Graph[SinkShape[Message], NotUsed] = 
    GraphDSL.create() {implicit b => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[Message](chatClientReceivers.size)) 

    (broadcast.outArray, chatClientReceivers).zipped 
     .foreach((bcOut, client) => bcOut ~> b.add(client).in) 

    SinkShape(broadcast.in) 
    } 

val fromClients: Graph[SourceShape[Message], NotUsed] = 
    GraphDSL.create() {implicit b => 
    import GraphDSL.Implicits._ 

    val merge = b.add(Merge[Message](chatClientSenders.size)) 

    (merge.inSeq, chatClientSenders).zipped 
     .foreach((mIn, client) => b.add(client).out ~> mIn) 

    SourceShape(merge.out) 
    } 

val upgradeResponse: Future[WebSocketUpgradeResponse] = 
    Source.fromGraph(fromClients) 
    .viaMat(webSocketFlow)(Keep.right) 
    .to(toClients) 
    .run() 

希望這有助於一點。