WebSocket包含兩個獨立的流,它們只是(可能)不在同一個JVM上。
你有兩個對等體進行通信,一個是服務器,另一個是客戶機,但從建立的WebSocket連接的角度來看,差異不再重要。一個數據流是對等體1向對等體2發送消息,另一個流是對等體2向對等體1發送消息,然後在這兩個對等體之間存在網絡邊界。如果您一次查看一個對等點,則您有對等點1接收來自對等點2的消息,而在第二個流中,對等點1正在向對等點2發送消息。
每個對等體都有一個接收部分的接收器和一個發送部分的源。實際上,您確實有兩個源和兩個共享池,但不是同一個ActorSystem(假設爲了解釋兩個對等實現在Akka HTTP中)。來自對等體1的源連接到對等體2的宿並且對等體2的源連接到對等體1的接收器。
因此,您編寫了一個接收器來描述如何處理第一個流中的傳入消息以及描述如何通過第二個流發送消息的源。通常,您希望根據您收到的消息生成消息,因此您可以將這兩者連接在一起,並通過不同的流程來路由消息,這些流程描述如何響應和傳入消息並生成任意數量的傳出消息。 Flow[Message, Message, _]
並不意味着您將傳出消息轉換爲傳入消息,而是將傳入消息轉換爲傳出消息。
webSocketFlow
是一個典型的異步邊界,代表另一個對等體的流。它通過將傳出消息發送到另一個對等體併發射其他對等體發送的內容來「轉換」傳出消息。
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
這種流動是兩個流的同行的一半:
[message from other peer]
連接到printSink
helloSource
連接到[message to the other peer]
有傳入郵件和傳出之間沒有關係消息,您只需打印收到的所有內容併發送一個「hello world!」。實際上,由於源在一條消息之後完成,所以WebSocket連接也關閉,但是如果用例如Source.repeat
替換源,則會不斷髮送(洪泛,真的)「hello,world!」。無論傳入消息的速率如何。
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
在這裏,您採取一切從outgoing
來了,這是你要發送的郵件,將其路由通過webSocketFlow
,通過與其他同行交流「轉變」的消息,並生成每一個接收到的消息到incoming
。通常你有一個有線協議,你可以將你的case class/pojo/dto消息編碼並解碼成Wire格式。
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
或者你能想象某種聊天服務器(啊,的WebSockets和聊天室),其中廣播和和一些客戶的融合信息。這應該採取的任何消息從任何客戶端,並將其發送給每個客戶端(演示只,未經測試,可能不是你想要什麼實際的聊天服務器):
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
希望這有助於一點。