2017-04-01 115 views
2

我想編寫一個Scala客戶端,該客戶端通過與TLS的tcp連接談論專有協議。如何使用akka在scala中使用TLS打開TCP連接

基本上,我想重寫從Node.js的下面的代碼在斯卡拉:

var conn_options = { 
     host: endpoint, 
     port: port 
}; 
tlsSocket = tls.connect(conn_options, function() { 
     if (tlsSocket.authorized) { 
     logger.info('Successfully established a connection'); 

     // Now that the connection has been established, let's perform the handshake 
     // Identification frame: 
     // 1 | I | id_size | id 
     var idFrameTypeAndVersion = "1I"; 
     var clientIdString = "foorbar"; 
     var idDataBuffer = new Buffer(idFrameTypeAndVersion.length + 1 + clientIdString.length); 

     idDataBuffer.write(idFrameTypeAndVersion, 0 , 
     idFrameTypeAndVersion.length); 

     idDataBuffer.writeUIntBE(clientIdString.length, 
     idFrameTypeAndVersion.length, 1); 
     idDataBuffer.write(clientIdString, idFrameTypeAndVersion.length + 1, clientIdString.length); 

     // Send the identification frame to Logmet 
     tlsSocket.write(idDataBuffer); 

     } 
     ... 
} 

akka documentation我發現阿卡一個很好的例子,通過純TCP,但我不知道如何提升該示例使用TLS套接字連接。有一些較早版本的文檔顯示with ssl/tls示例,但在較新版本中未找到。

我發現了關於Akka中一個TLS對象的文檔,但是我沒有在它周圍找到任何好的示例。

非常感謝提前!

+1

TLS支持的使用示例可以在其單元測試中找到。 https://github.com/akka/akka/blob/master/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala。希望能幫助到你! –

回答

4

瞭解它與下面的代碼工作,並希望分享。

基本上,我開始看着從akka社區獲得的TcpTlsEcho.java

我跟着akka-streams的文檔。示出和說明阿卡流的使用的另一種很好的例子可以在下列blog post

連接建立被發現和流動的樣子:

/** 
    +---------------------------+    +---------------------------+ 
    | Flow      |    | tlsConnectionFlow   | 
    |       |    |       | 
    | +------+  +------+ |    | +------+  +------+ | 
    | | SRC | ~Out~> |  | ~~> O2 -- I1 ~~> |  | ~O1~> |  | | 
    | |  |  | LOGG | |    | | TLS |  | CONN | | 
    | | SINK | <~In~ |  | <~~ I2 -- O2 <~~ |  | <~I2~ |  | | 
    | +------+  +------+ |    | +------+  +------+ | 
    +---------------------------+    +---------------------------+ 
**/ 
// the tcp connection to the server 
val connection = Tcp().outgoingConnection(address, port) 

// ignore the received data for now. There are different actions to implement the Sink. 
val sink = Sink.ignore 

// create a source as an actor reference 
val source = Source.actorRef(1000, OverflowStrategy.fail) 

// join the TLS BidiFlow (see below) with the connection 
val tlsConnectionFlow = tlsStage(TLSRole.client).join(connection) 

// run the source with the TLS conection flow that is joined with a logging step that prints the bytes that are sent and or received from the connection. 
val sourceActor = tlsConnectionFlow.join(logging).to(sink).runWith(source) 

// send a message to the sourceActor that will be send to the Source of the stream 
sourceActor ! ByteString("<message>") 

的TLS連接流是一個BidiFlow。我的第一個簡單例子忽略了所有證書,並避免管理信任和密鑰存儲。示例如何完成可以在上面的.java示例中找到。

def tlsStage(role: TLSRole)(implicit system: ActorSystem) = { 
    val sslConfig = AkkaSSLConfig.get(system) 
    val config = sslConfig.config 

    // create a ssl-context that ignores self-signed certificates 
    implicit val sslContext: SSLContext = { 
     object WideOpenX509TrustManager extends X509TrustManager { 
      override def checkClientTrusted(chain: Array[X509Certificate], authType: String) =() 
      override def checkServerTrusted(chain: Array[X509Certificate], authType: String) =() 
      override def getAcceptedIssuers = Array[X509Certificate]() 
     } 

     val context = SSLContext.getInstance("TLS") 
     context.init(Array[KeyManager](), Array(WideOpenX509TrustManager), null) 
     context 
    } 
    // protocols 
    val defaultParams = sslContext.getDefaultSSLParameters() 
    val defaultProtocols = defaultParams.getProtocols() 
    val protocols = sslConfig.configureProtocols(defaultProtocols, config) 
    defaultParams.setProtocols(protocols) 

    // ciphers 
    val defaultCiphers = defaultParams.getCipherSuites() 
    val cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, config) 
    defaultParams.setCipherSuites(cipherSuites) 

    val firstSession = new TLSProtocol.NegotiateNewSession(None, None, None, None) 
     .withCipherSuites(cipherSuites: _*) 
     .withProtocols(protocols: _*) 
     .withParameters(defaultParams) 

    val clientAuth = getClientAuth(config.sslParametersConfig.clientAuth) 
    clientAuth map { firstSession.withClientAuth(_) } 

    val tls = TLS.apply(sslContext, firstSession, role) 

    val pf: PartialFunction[TLSProtocol.SslTlsInbound, ByteString] = { 
     case TLSProtocol.SessionBytes(_, sb) => ByteString.fromByteBuffer(sb.asByteBuffer) 
    } 

    val tlsSupport = BidiFlow.fromFlows(
     Flow[ByteString].map(TLSProtocol.SendBytes), 
     Flow[TLSProtocol.SslTlsInbound].collect(pf)); 

    tlsSupport.atop(tls); 
    } 

    def getClientAuth(auth: ClientAuth) = { 
    if (auth.equals(ClientAuth.want)) { 
     Some(TLSClientAuth.want) 
    } else if (auth.equals(ClientAuth.need)) { 
     Some(TLSClientAuth.need) 
    } else if (auth.equals(ClientAuth.none)) { 
     Some(TLSClientAuth.none) 
    } else { 
     None 
    } 
    } 

並且爲了完成,還有作爲BidiFlow實施的記錄階段。

def logging: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { 
    // function that takes a string, prints it with some fixed prefix in front and returns the string again 
    def logger(prefix: String) = (chunk: ByteString) => { 
     println(prefix + chunk.utf8String) 
     chunk 
    } 

    val inputLogger = logger("> ") 
    val outputLogger = logger("< ") 

    // create BidiFlow with a separate logger function for each of both streams 
    BidiFlow.fromFunctions(outputLogger, inputLogger) 
} 

我會進一步嘗試改進和更新答案。希望有所幫助。

+0

歡迎使用堆棧溢出!感謝您分享您的答案。這對我來說太具體了,但可能會幫助其他人。 –

1

我真的很喜歡Jeremias Werner的回答,因爲它讓我有我需要的地方。然而,我想提供下面的代碼(受他的回答影響很大),作爲一個「一個剪切和粘貼」解決方案,使用盡可能少的代碼創建實際的TLS服務器 。

import javax.net.ssl.SSLContext 

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.TLSProtocol.NegotiateNewSession 
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, TLS, Tcp} 
import akka.stream.{ActorMaterializer, OverflowStrategy, TLSProtocol, TLSRole} 
import akka.util.ByteString 

object TlsClient { 

    // Flow needed for TLS as well as mapping the TLS engine's flow to ByteStrings 
    def tlsClientLayer = { 

    // Default SSL context supporting most protocols and ciphers. Embellish this as you need 
    // by constructing your own SSLContext and NegotiateNewSession instances. 
    val tls = TLS(SSLContext.getDefault, NegotiateNewSession.withDefaults, TLSRole.client) 

    // Maps the TLS stream to a ByteString 
    val tlsSupport = BidiFlow.fromFlows(
     Flow[ByteString].map(TLSProtocol.SendBytes), 
     Flow[TLSProtocol.SslTlsInbound].collect { 
     case TLSProtocol.SessionBytes(_, sb) => ByteString.fromByteBuffer(sb.asByteBuffer) 
     }) 

    tlsSupport.atop(tls) 
    } 

    // Very simple logger 
    def logging: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { 

    // function that takes a string, prints it with some fixed prefix in front and returns the string again 
    def logger(prefix: String) = (chunk: ByteString) => { 
     println(prefix + chunk.utf8String) 
     chunk 
    } 

    val inputLogger = logger("> ") 
    val outputLogger = logger("< ") 

    // create BidiFlow with a separate logger function for each of both streams 
    BidiFlow.fromFunctions(outputLogger, inputLogger) 
    } 

    def main(args: Array[String]): Unit = { 
    implicit val system: ActorSystem = ActorSystem("sip-client") 
    implicit val materializer: ActorMaterializer = ActorMaterializer() 

    val source = Source.actorRef(1000, OverflowStrategy.fail) 
    val connection = Tcp().outgoingConnection("www.google.com", 443) 
    val tlsFlow = tlsClientLayer.join(connection) 
    val srcActor = tlsFlow.join(logging).to(Sink.ignore).runWith(source) 

    // I show HTTP here but send/receive your protocol over this actor 
    // Should respond with a 302 (Found) and a small explanatory HTML message 
    srcActor ! ByteString("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n") 
    } 
}