2016-04-03 52 views
2

我有一段代碼(見下文),它產生了一個服務器,它迴應從端口6001接收到的每一個ByteString流。該示例還定義了一個客戶端,該客戶端連接到服務器併發送包含從字母'a'到'z'的字符列表的ByteString流。Akka通過http的對象流

我現在的問題是,akka提供了一種方法來發送和接收對象的流,而不是通過http的ByStreams?例如,類Client的對象。

如果是這樣,我該如何發送和接收這樣的對象流?你能否給我提供一個展示如何實施的片段?

阿卡文檔不是用戶友好的非玩具的例子...

感謝您的幫助

公共類TcpEcho {

/** 
* Use without parameters to start both client and server. 
* 
* Use parameters `server 0.0.0.0 6001` to start server listening on port 
* 6001. 
* 
* Use parameters `client 127.0.0.1 6001` to start client connecting to 
* server on 127.0.0.1:6001. 
* 
*/ 
public static void main(String[] args) throws IOException { 
    if (args.length == 0) { 
     ActorSystem system = ActorSystem.create("ClientAndServer"); 
     InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     server(system, serverAddress); 
     client(system, serverAddress); 
    } else { 
     InetSocketAddress serverAddress; 
     if (args.length == 3) { 
      serverAddress = new InetSocketAddress(args[1], Integer.valueOf(args[2])); 
     } else { 
      serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     } 
     if (args[0].equals("server")) { 
      ActorSystem system = ActorSystem.create("Server"); 
      server(system, serverAddress); 
     } else if (args[0].equals("client")) { 
      ActorSystem system = ActorSystem.create("Client"); 
      client(system, serverAddress); 
     } 
    } 
} 

public static void server(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> { 
     System.out.println("Client connected from: " + conn.remoteAddress()); 
     conn.handleWith(Flow.<ByteString> create(), materializer); 
    }); 

    final CompletionStage<ServerBinding> bindingFuture = Tcp.get(system) 
      .bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer); 

    bindingFuture.whenComplete((binding, throwable) -> { 
     System.out.println("Server started, listening on: " + binding.localAddress()); 
    }); 

    bindingFuture.exceptionally(e -> { 
     System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage()); 
     system.terminate(); 
     return null; 
    }); 

} 

public static void client(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final List<ByteString> testInput = new ArrayList<>(); 
    for (char c = 'a'; c <= 'z'; c++) { 
     testInput.add(ByteString.fromString(String.valueOf(c))); 
    } 

    Source<ByteString, NotUsed> responseStream = Source.from(testInput) 
      .via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort())); 

    CompletionStage<ByteString> result = responseStream.runFold(ByteString.empty(), (acc, in) -> acc.concat(in), 
      materializer); 

    result.whenComplete((success, failure) -> { 

     if (failure != null) { 
      System.err.println("Failure: " + failure.getMessage()); 
     } else { 
      System.out.println("Result: " + success.utf8String()); 
     } 
     System.out.println("Shutting down client"); 
     system.terminate(); 

    }); 
} 

}

+0

你見過[這個例子](http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-graphs.html#Bidirectional_Flows)如何創建雙向流,我認爲它或多或少的,你要求什麼? – lpiepiora

+0

我沒有看,但我會。無論如何,你有一個更好的想法/建議的代碼片段?謝謝 – broga

回答

2

akka.stream.{javadsl,scaladsl}.Framing包含水電費幫助您建立一致的信息。例如,您可以通過Framing.simpleFramingProtocolEncoder(maxLength)發送消息來自動向他們添加長度信息。另一方面,Framing.simpleFramingProtocolDecoder(maxLength)將根據其封閉的長度信息來解碼消息。

如果您想操作普通對象,您只需在將它們發送到ByteString之前將它們發送到編碼器,並在從解碼器接收它們的表示之後將它們從ByteString反序列化。