我想創建一個TCP客戶端org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory。當工廠創建TCP連接時,我需要將一些數據發送到服務器,例如授權。服務器發送一些數據作爲響應(例如,鹽)。在下一個請求將需要發送鹽到服務器。對於池中的所有連接,鹽必須不同,我認爲連接必須自己存儲鹽。春天集成狀態TCP連接
所以,我的實現是...
的Spring Bean XML:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
interceptor-factory-chain="customInterceptorFactory"
mapper="mapper" />
<bean id="customInterceptorFactory" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors">
<bean class="ru.example.gateway.StatefulTcpConnectionFactory" />
</property>
</bean>
<bean id="cachedClient" class="org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory">
<constructor-arg ref="client" />
<constructor-arg value="5" />
</bean>
<int:channel id="clientRequestChannel"/>
<int-ip:tcp-outbound-gateway id="clientCrLf"
connection-factory="cachedClient"
request-channel="clientRequestChannel"/>
<int:converter ref="byteArrayToStringConverter" />
<bean id="byteArrayToStringConverter" class="ru.example.gateway.ByteArrayToStringConverter">
<property name="charset" value="windows-1251" />
</bean>
<bean id="mapper" class="org.springframework.integration.ip.tcp.connection.TcpMessageMapper">
<property name="charset" value="windows-1251" />
</bean>
TCP連接攔截:
package ru.example.gateway;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Getter
public class StatefulTcpConnection extends TcpConnectionInterceptorSupport {
private volatile String nextSalt = "";
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final CountDownLatch latch = new CountDownLatch(1);
public StatefulTcpConnection() {
}
public StatefulTcpConnection(ApplicationEventPublisher applicationEventPublisher) {
super(applicationEventPublisher);
}
@Override
public void send(Message<?> message) throws Exception {
if (!initialized.get()) {
throw new Exception("Connection not initialized");
}
Message<?> newMessage = message;
Object payload = message.getPayload();
log.debug("Send Payload({})", payload.getClass());
if (payload instanceof String) {
new GenericMessage<String>((String) payload + ";Salt=" + nextSalt + ";", message.getHeaders());
}
super.send(newMessage);
}
/**
* Invoke initialize after connection wrapped in ConnectionFactory4Interceptors
* @see org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory#initializeConnection(TcpConnectionSupport, Socket)
* @param serializer
*/
@Override
public void setSerializer(Serializer<?> serializer) {
super.setSerializer(serializer);
try {
initialize();
} catch (Exception e) {
throw new RuntimeException("Connection couldn't initialize", e);
}
}
private void initialize() throws Exception {
GenericMessage<String> loginMessage = new GenericMessage<>("Login");
super.send(loginMessage);
log.debug("Waiting initializing response");
latch.await(10, TimeUnit.SECONDS);
}
@Override
public boolean onMessage(Message<?> message) {
Object messagePayload = message.getPayload();
log.debug("onMessage Payload({})", messagePayload.getClass());
messagePayload = new String((byte[]) messagePayload);
if (messagePayload instanceof String) {
String payload = (String) messagePayload,
salt;
int saltStart = payload.indexOf(";Salt=");
if (saltStart >= 0) {
int saltEnd = payload.indexOf(";", saltStart + 1);
if (saltEnd >= 0) {
salt = payload.substring(saltStart + 6, saltEnd);
} else {
salt = payload.substring(saltStart + 6);
}
nextSalt = salt;
}
if (!initialized.get()) {
initialized.set(true);
latch.countDown();
log.debug("Initializing complete.");
}
}
return super.onMessage(message);
}
@Override
public String getConnectionId() {
return "Stateful:" + super.getConnectionId();
}
@Override
public String toString() {
return getConnectionId();
}
}
網關:
package ru.example.gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway(name = "clientOutGateway",
defaultRequestChannel = "clientRequestChannel"
)
public interface StringGateway {
String send(String message);
}
和服務的Spring MVC :
package ru.example.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.example.gateway.StringGateway;
import ru.example.service.MessageService;
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private StringGateway clientGateway;
public String getMessage(String code) {
return clientGateway.send(code);
}
}
我有一些錯誤。在從MessageServiceImpl第一個請求,我獲得了「認證請求」的響應,需要響應丟失,或在日誌中拋出異常或錯誤信息:
ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply for Cached:Stateful:localhost:12345:56060:6993fc83-7e69-4f18-9300-8553e6d74a4f
是否有人有狀態連接的解決方案?
謝謝! 對不起我的英語
在我的例子中,每個連接的狀態(field nextSalt)工作良好。我在授權後發送授權請求並收集響應時遇到問題。 – Andrey