2016-03-04 234 views
1

我試圖創建一個Tcp服務器,它接受入站連接,並異步向連接的客戶端發送消息。 有一個Tcp服務器的示例,但它使用網關,它是請求/響應,不支持異步。與Spring集成框架的Tcp連接

我的目標,

  1. 服務器聽插座,例如9000
  2. 一個tcp客戶端連接到9000
  3. 服務器接受連接和接收消息。 (使用TcpReceivingChannelAdapter?)
  4. 服務器保持連接/套接字並記下ip_connectId標頭。
  5. 當某個事件或計劃任務爲客戶端產生消息時,它查找ip_connectId並向該客戶端發送消息。 (使用TcpSendingMessageHandler?)

從參考文檔中,我應該使用協作出站和入站通道適配器。但沒有java配置示例。我不明白如何用java配置來做到這一點,尤其是如何以及在哪裏尋找客戶端發送。

我需要兩個通道嗎?一個用於入站,一個用於出站? inboundAdapter-> fromTcpChannel->消費 生產者 - > outboundAdapter-> toTcpChannel

做我創建ServiceActivator或端點充當生產者/消費者? 默認彈簧集成是否保持連接活着?並且當我需要發送消息給它時,只需將ip_connectId頭添加到消息中? 我是否使用TcpSendingMessageHandler將消息發送給客戶端,還是需要實施gateway

清理我的代碼並在Gary的幫助後再次測試,這是我的代碼。

@EnableIntegration 
@IntegrationComponentScan 
@Configuration 
public class IntegrationConfig implements 
     ApplicationListener<TcpConnectionEvent> { 
    @Value("${listen.port:8000}") 
    private int port; 

    @Bean //for accepting text message from TCP, putty 
    public MessageChannel fromTcp() { 
     return new DirectChannel(); 
    } 

    @Bean //for sending text message to TCP client, outbound 
    public MessageChannel toTcp() { 
     return new DirectChannel(); 
    } 

    // receive from MVC controller 
    @Bean 
    public MessageChannel invokeChannel() { 
     return new DirectChannel(); 
    } 

    @Bean //inbound, it is working, I could read the inbound message while debugging 
    public TcpReceivingChannelAdapter in(
      AbstractServerConnectionFactory connectionFactory) { 
     TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); 
     adapter.setOutputChannel(fromTcp()); 
     adapter.setConnectionFactory(connectionFactory); 
     return adapter; 
    } 

    //transform TCP bytes to string message, working 
    @Transformer(inputChannel = "fromTcp", outputChannel = "toCollaborate") 
    public String convert(byte[] bytes) { 

     return new String(bytes); 
    } 

    MessageHeaders staticheader; //save ip_connectinId, use this to collaborate outbound message later, for testing purpose only 
    @ServiceActivator(inputChannel = "toCollaborate", outputChannel = "toTcp") 
    public Message<String> handleTcpMessage(Message<String> stringMsg) { 
     staticheader = stringMsg.getHeaders(); 
     return stringMsg; 
     // save the header, collaborate to output channel 
    } 

    //collaborate message from REST API invokeChannel to a outbound tcp client, this fail 
    @Transformer(inputChannel = "invokeChannel", outputChannel = "toTcp") 
    public Message<String> headerBeforeSend(String test) { 
     GenericMessage<String> msg = new GenericMessage<String>(
       "from rest api"); 
     if (staticheader != null) {   
      MessageBuilder 
        .fromMessage(msg) 
        .setHeader("ip_connectionId", 
          staticheader.get("ip_connectionId")).build(); 
     } 
     return msg; 
    } 

    @ServiceActivator(inputChannel = "toTcp") 
    @Bean 
    public TcpSendingMessageHandler out(
      AbstractServerConnectionFactory connectionFactory) { 
     TcpSendingMessageHandler tcpOutboundAdp = new TcpSendingMessageHandler(); 
     tcpOutboundAdp.setConnectionFactory(connectionFactory); 


     return tcpOutboundAdp; 
    } 

    // should need only 1 factory? and keep connectin alive 
    // server for in coming connection 
    @Bean 
    public AbstractServerConnectionFactory serverCF() { 
     return new TcpNetServerConnectionFactory(this.port); 
    } 

    @Override 
    public void onApplicationEvent(TcpConnectionEvent tcpEvent) { 
     // TODO Auto-generated method stub 
     TcpConnection source = (TcpConnection) tcpEvent.getSource(); 

    } 

} 
//The MVC controller 
@Autowired 
    MessageChannel invokeChannel; 
    @RequestMapping(value="/invoke") 
    public String sayHello() 
    { 
     //trigger gateway to send a message 
     String msg = "hello"; 
     MessagingTemplate template = new MessagingTemplate(); 
     template.send(invokeChannel, new GenericMessage<String>(msg));  
     return msg; 
    } 

測試結果: 1.油灰連接確定,發送文本消息 2. SI接收消息確定 3.使用REST API localhost/webappname/rest/invoke將消息發送到invokeChannel,OK 4. transformer組消息頭 5.異常作爲遵循

例外org.springframework.web.util.NestedServletException:請求失敗 處理;嵌套的例外是 org.springframework.messaging.MessageHandlingException:無法找到 出境插座 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981) org.springframework.web.servlet.FrameworkServlet.doGet( FrameworkServlet.java:860) javax.servlet.http.HttpServlet.service(HttpServlet.java:622) org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:845) javax.servlet.http.HttpServlet 。服務(HttpServlet.java:729) org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)

回答

1

ÿ ES;該連接默認保持打開狀態;是的,您可以使用@ServiceActivator來處理請求;是的,你只需要設置連接標頭。

要配置的Java配置出站適配器,添加@ServiceActivator到處理器豆...

@Bean 
public TcpReceivingChannelAdapter() in() { 

    ... 
    adapter.setOutputChannel(newRequests()); 

} 


... 

@ServiceActivator(inputChannel="toClientChannel") 
@Bean 
public TcpSendingMessageHandler out() { 

    ... 

} 
+0

另請參見參考手冊中的'@ Configuration'樣本:http://docs.spring.io /spring-integration/reference/html/ip.html#ip-annotation –

+0

加里,我還是不完全明白。 當我創建'TcpSendingMessageHandler' bean時,我應該爲其參數分配一個'AbstractServerConnectionFactory'對象嗎? 當有消息輸出到'toClientChannel'通道時,如何在out()中獲取這個消息對象? 如何通過此'out()'處理程序中的TCP發送消息? –

+0

如何從'newRequest()'返回一個頻道?我的頻道是在「Endpoint」類的外部定義的。 @EnableIntegration @IntegrationComponentScan @Configuration 公共類IntegrationConfig { @MessageEndpoint \t公共靜態類RelayService { @Bean 公共TcpReceivingChannelAdapter()的(){ ... adapter.setOutputChannel(newRequests() ); } } } –