2016-11-07 62 views
0

我正在使用這個example作爲我的出發點。如何使用不響應的Akka Java API創建TCP接收器

在這個例子中,服務器響應修改過的文本,但在我的情況下,服務器不需要響應。

看着其他類似的問題,我知道我可以傳遞Empty ByteString或使用過濾器(p - > false)不發回任何東西。但是,在這種情況下,問題是我的whenComplete塊沒有得到執行。即異常被吞噬。有沒有辦法避免這種情況?幫助讚賞!

connections.runForeach(connection -> { 
System.out.println("New connection from: " + connection.remoteAddress()); 

final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class) 
.via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW)) 
.map(ByteString::utf8String) 
.map(s -> s + "!!!\n") 
.map(ByteString::fromString); 

connection.handleWith(echo, mat); 
}, mat).whenComplete((done,throwable) -> 
    { 
    //exception handling 
    } 
); 
+0

在進一步的分析,我認爲這個問題是我想爲每個連接completionStage而不是Source(連接)的末尾。 – tapasvi

回答

0

您的分析是正確的,現在您在服務器關閉時發生反應,而不是每次連接。

起反應的各個連接完成將在傳遞到連接的流動,像這樣的東西來完成:

final Tcp tcp = Tcp.get(system); 
tcp.bind("127.0.0.1", 6789).runForeach((connection) -> { 

    final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class) 
    .via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW)) 
    .map(ByteString::utf8String) 
    .map(s -> s + "!!!\n") 
    .map(ByteString::fromString) 
    .watchTermination((notUsed, completionStage) -> { 
     completionStage.whenComplete((done, exception) -> { 
     System.out.println("Connection from " + connection.remoteAddress() + " completed"); 
     }); 
     return notUsed; 
    }); 

    connection.handleWith(echo, materializer); 
}, materializer); 
相關問題