2016-03-04 139 views
1

我想創建一個TCP客戶端org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory。當工廠創建TCP連接時,我需要將一些數據發送到服務器,例如授權。服務器發送一些數據作爲響應(例如,)。在下一個請求將需要發送到服務器。對於池中的所有連接,必須不同,我認爲連接必須自己存儲春天集成狀態TCP連接

所以,我的實現是...
的Spring Bean XML:

<int-ip:tcp-connection-factory id="client" 
            type="client" 
            host="localhost" 
            port="12345" 
            interceptor-factory-chain="customInterceptorFactory" 
            mapper="mapper" /> 

    <bean id="customInterceptorFactory" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain"> 
     <property name="interceptors"> 
      <bean class="ru.example.gateway.StatefulTcpConnectionFactory" /> 
     </property> 
    </bean> 
    <bean id="cachedClient" class="org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory"> 
     <constructor-arg ref="client" /> 
     <constructor-arg value="5" /> 
    </bean> 

    <int:channel id="clientRequestChannel"/> 

    <int-ip:tcp-outbound-gateway id="clientCrLf" 
           connection-factory="cachedClient" 
           request-channel="clientRequestChannel"/> 

    <int:converter ref="byteArrayToStringConverter" /> 
    <bean id="byteArrayToStringConverter" class="ru.example.gateway.ByteArrayToStringConverter"> 
     <property name="charset" value="windows-1251" /> 
    </bean> 
    <bean id="mapper" class="org.springframework.integration.ip.tcp.connection.TcpMessageMapper"> 
     <property name="charset" value="windows-1251" /> 
    </bean> 

TCP連接攔截:

package ru.example.gateway; 

import lombok.Getter; 
import lombok.extern.slf4j.Slf4j; 
import org.springframework.context.ApplicationEventPublisher; 
import org.springframework.core.serializer.Serializer; 
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport; 
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.support.GenericMessage; 

import java.net.Socket; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicBoolean; 

@Slf4j 
@Getter 
public class StatefulTcpConnection extends TcpConnectionInterceptorSupport { 

    private volatile String nextSalt = ""; 
    private final AtomicBoolean initialized = new AtomicBoolean(false); 
    private final CountDownLatch latch = new CountDownLatch(1); 

    public StatefulTcpConnection() { 
    } 

    public StatefulTcpConnection(ApplicationEventPublisher applicationEventPublisher) { 
     super(applicationEventPublisher); 
    } 

    @Override 
    public void send(Message<?> message) throws Exception { 
     if (!initialized.get()) { 
      throw new Exception("Connection not initialized"); 
     } 
     Message<?> newMessage = message; 
     Object payload = message.getPayload(); 
     log.debug("Send Payload({})", payload.getClass()); 
     if (payload instanceof String) { 
      new GenericMessage<String>((String) payload + ";Salt=" + nextSalt + ";", message.getHeaders()); 
     } 
     super.send(newMessage); 
    } 

    /** 
    * Invoke initialize after connection wrapped in ConnectionFactory4Interceptors 
    * @see org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory#initializeConnection(TcpConnectionSupport, Socket) 
    * @param serializer 
    */ 
    @Override 
    public void setSerializer(Serializer<?> serializer) { 
     super.setSerializer(serializer); 
     try { 
      initialize(); 
     } catch (Exception e) { 
      throw new RuntimeException("Connection couldn't initialize", e); 
     } 
    } 

    private void initialize() throws Exception { 
     GenericMessage<String> loginMessage = new GenericMessage<>("Login"); 
     super.send(loginMessage); 
     log.debug("Waiting initializing response"); 
     latch.await(10, TimeUnit.SECONDS); 
    } 

    @Override 
    public boolean onMessage(Message<?> message) { 
     Object messagePayload = message.getPayload(); 
     log.debug("onMessage Payload({})", messagePayload.getClass()); 
     messagePayload = new String((byte[]) messagePayload); 
     if (messagePayload instanceof String) { 
      String payload = (String) messagePayload, 
       salt; 
      int saltStart = payload.indexOf(";Salt="); 
      if (saltStart >= 0) { 
       int saltEnd = payload.indexOf(";", saltStart + 1); 
       if (saltEnd >= 0) { 
        salt = payload.substring(saltStart + 6, saltEnd); 
       } else { 
        salt = payload.substring(saltStart + 6); 
       } 
       nextSalt = salt; 
      } 
      if (!initialized.get()) { 
       initialized.set(true); 
       latch.countDown(); 
       log.debug("Initializing complete."); 
      } 
     } 
     return super.onMessage(message); 
    } 

    @Override 
    public String getConnectionId() { 
     return "Stateful:" + super.getConnectionId(); 
    } 

    @Override 
    public String toString() { 
     return getConnectionId(); 
    } 

} 

網關:

package ru.example.gateway; 

import org.springframework.integration.annotation.MessagingGateway; 

@MessagingGateway(name = "clientOutGateway", 
    defaultRequestChannel = "clientRequestChannel" 
) 
public interface StringGateway { 

    String send(String message); 

} 

和服務的Spring MVC :

package ru.example.service.impl; 

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Service; 
import ru.example.gateway.StringGateway; 
import ru.example.service.MessageService; 

@Service 
public class MessageServiceImpl implements MessageService { 

    @Autowired 
    private StringGateway clientGateway; 

    public String getMessage(String code) { 
     return clientGateway.send(code); 
    } 

} 

我有一些錯誤。在從MessageServiceImpl第一個請求,我獲得了「認證請求」的響應,需要響應丟失,或在日誌中拋出異常或錯誤信息:

ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply for Cached:Stateful:localhost:12345:56060:6993fc83-7e69-4f18-9300-8553e6d74a4f 

是否有人有狀態連接的解決方案?

謝謝! 對不起我的英語

回答

0

目前沒有由框架提供的有狀態連接管理。您可以爲連接工廠編寫一個包裝,將連接存儲在ThreadLocal中,但是當您完成連接時,您需要一些機制來清理(並釋放它)。

如果只是一個簡單的握手,你可能可以使用連接攔截器;有一個測試案例(見HelloWorldInterceptor),但它有點複雜。

我認爲自定義連接工廠包裝會更簡單。

+0

在我的例子中,每個連接的狀態(field nextSalt)工作良好。我在授權後發送授權請求並收集響應時遇到問題。 – Andrey