2016-09-23 134 views
1

我需要與在給定端口上作爲服務器運行的C++應用程序對話。它公開了一個二進制API(協議緩衝區)以獲得更好的性能。我的RESTful服務是在Spring MVC和Jersey中開發的,並希望使用這個新功能。我已經能夠成功地使用和生成協議緩衝區消息。Java Web應用程序和C++服務器之間的套接字通信

在我的spring web應用程序中,我最初創建了一個Apache Commons Pool來創建一個套接字連接池。這是我在讀/寫插槽

更新1:添加PooledObjectFactory實施

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<Socket> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public Socket create() throws Exception { 
     return new Socket(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(Socket socket) { 
     return new DefaultPooledObject<>(socket); 
    } 

    @Override 
    public void destroyObject(final PooledObject<Socket> p) throws Exception { 
     final Socket socket = p.getObject(); 
     socket.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<Socket> p) { 
     final Socket socket = p.getObject(); 
     return socket != null && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 
} 

@Service 
@Scope("prototype") 
public class Gateway { 
    @Autowired 
    private GenericObjectPool pool; 

    public Response sendAndReceive(Request request) throws CommunicationException { 
     Response response = null; 
     final Socket socket = pool.borrowObject(); 
     try { 
      request.writeDelimitedTo(socket.getOutputStream()); 
      response = Response.parseDelimitedFrom(socket.getInputStream()); 
     } catch (Exception ex) { 
      LOGGER.error("Gateway error", ex); 
      throw new CommunicationException("Gateway error", ex); 
     } finally { 
      pool.returnObject(socket); 
     } 
     return response; 
    } 
} 

這適用於第一個請求,並在池中返回任何以前使用的插座被發現套接字已經關閉。這可能是因爲不同的請求正在連接到相同的輸入和輸出流。如果我在閱讀響應後關閉套接字,那麼它會勝過共享的目的。如果我使用單例套接字並注入它,它能夠處理第一個請求,然後超時。

如果我在每個實例上創建套接字,那麼它可以工作,並且對於每個請求,性能大約爲2500微秒。我的目標是在500微秒內獲得性能。

考慮到要求,最好的方法是什麼?

更新2:添加服務器和客戶端

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.ServerSocket; 
import java.net.Socket; 

public class TcpServer1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpServer1.class.getName()); 

    public static void main(String[] args) throws Exception { 
     ServerSocket serverSocket = new ServerSocket(Integer.parseInt(args[0])); 
     Socket socket = null; 
     while (true) { 
      try { 
       socket = serverSocket.accept(); 
      } catch (IOException e) { 
       LOGGER.warn("Could not listen on port"); 
       System.exit(-1); 
      } 

      Thread thread = new Thread(new ServerConnection1(socket)); 
      thread.start(); 
     } 
    } 
} 

class ServerConnection1 implements Runnable { 

    static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class.getName()); 

    private Socket socket = null; 

    ServerConnection1(Socket socket) { 
     this.socket = socket; 
    } 

    public void run() { 
     try { 
      serveRequest(socket.getInputStream(), socket.getOutputStream()); 
      //socket.close(); 
     } catch (IOException ex) { 
      LOGGER.warn("Error", ex); 
     } 
    } 

    public void serveRequest(InputStream inputStream, OutputStream outputStream) { 
     try { 
      read(inputStream); 
      write(outputStream); 
     } catch (IOException ex) { 
      LOGGER.warn("ERROR", ex); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Response.Builder builder = Response.newBuilder(); 
     Response response = builder.setStatus("SUCCESS").setPing("PING").build(); 
     response.writeDelimitedTo(outputStream); 
     LOGGER.info("Server sent {}", response.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Request request = Request.parseDelimitedFrom(inputStream); 
     LOGGER.info("Server received {}", request.toString()); 
    } 
} 

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.Socket; 

public class TcpClient1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpClient1.class.getName()); 

    private Socket openConnection(final String hostName, final int port) { 
     Socket clientSocket = null; 
     try { 
      clientSocket = new Socket(hostName, port); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while connecting to server", e); 
     } 
     return clientSocket; 
    } 

    private void closeConnection(Socket clientSocket) { 
     try { 
      LOGGER.info("Closing the connection"); 
      clientSocket.close(); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while closing the connection", e); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Request.Builder builder = Request.newBuilder(); 
     Request request = builder.setPing("PING").build(); 
     request.writeDelimitedTo(outputStream); 
     LOGGER.info("Client sent {}", request.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Response response = Response.parseDelimitedFrom(inputStream); 
     LOGGER.info("Client received {}", response.toString()); 
    } 

    public static void main(String args[]) throws Exception { 
     TcpClient1 client = new TcpClient1(); 
     try { 
      Socket clientSocket = null; 

      LOGGER.info("Scenario 1 --> One socket for each call"); 
      for (int i = 0; i < 2; i++) { 
       clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
       OutputStream outputStream = clientSocket.getOutputStream(); 
       InputStream inputStream = clientSocket.getInputStream(); 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
       client.closeConnection(clientSocket); 
      } 

      LOGGER.info("Scenario 2 --> One socket for all calls"); 
      clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
      OutputStream outputStream = clientSocket.getOutputStream(); 
      InputStream inputStream = clientSocket.getInputStream(); 
      for (int i = 0; i < 2; i++) { 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
      } 
      client.closeConnection(clientSocket); 
     } catch (Exception e) { 
      LOGGER.warn("Exception occurred", e); 
      System.exit(1); 
     } 
    } 
} 

這裏請求和響應協議緩衝區類。在方案1中,它能夠處理兩個調用,而在方案2中,它不會從第二個讀取返回。似乎協議緩衝區API正在處理不同的流。下面的示例輸出

17:03:10.508 [main] INFO c.d.e.socket.TcpClient1 - Scenario 1 --> One socket for each call 
17:03:10.537 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.698 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.731 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.732 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Scenario 2 --> One socket for all calls 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.735 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
+0

上沒有足夠的信息了'PooledObjectFactory'(由'GenericObjectPool'使用)的行爲 - 也許'passivateObject'方法關閉套接字? –

+0

它是否也可能不是關閉連接的C++應用程序,比如說在閒置的特定時間段之後? – Gimby

+0

@Adrian新增工廠類別 – user2459396

回答

0

經過很大的痛苦,我才解決了這個問題。正在處理對套接字的讀/寫的類被定義爲原型。所以一旦檢索到套接字的引用,它就不會被清除(由Tomcat管理)。隨後對套接字的後續調用被排隊,然後超時並且Apache Commons Pool銷燬該對象。

爲了解決這個問題,我使用Socket的ThreadLocal創建了類SocketConnection。在處理方面,我創建了一個Callback來處理對套接字的讀/寫操作。下面的示例代碼段:

class SocketConnection { 

    final private String identity; 
    private boolean alive; 
    final private ThreadLocal<Socket> threadLocal; 

    public SocketConnection(final String hostname, final int port) throws IOException { 
     this.identity = UUID.randomUUID().toString(); 
     this.alive = true; 
     threadLocal = ThreadLocal.withInitial(rethrowSupplier(() -> new Socket(hostname, port))); 
    } 

} 

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<SocketConnection> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 
    private SocketConnection connection = null; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public SocketConnection create() throws Exception { 
     LOGGER.info("Creating Socket"); 
     return new SocketConnection(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(SocketConnection socketConnection) { 
     return new DefaultPooledObject<>(socketConnection); 
    } 

    @Override 
    public void destroyObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
     socketConnection.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<SocketConnection> p) { 
     final SocketConnection connection = p.getObject(); 
     final Socket socket = connection.get(); 
     return connection != null && connection.isAlive() && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(true); 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
    } 

} 

class SocketCallback implements Callable<Response> { 

    private SocketConnection socketConnection; 
    private Request request; 

    public SocketCallback() { 
    } 

    public SocketCallback(SocketConnection socketConnection, Request request) { 
     this.socketConnection = socketConnection; 
     this.request = request; 
    } 

    public Response call() throws Exception { 
     final Socket socket = socketConnection.get(); 
     request.writeDelimitedTo(socket.getOutputStream()); 
     Response response = Response.parseDelimitedFrom(socket.getInputStream()); 
     return response; 
    } 

} 

@Service 
@Scope("prototype") 
public class SocketGateway { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SocketGateway.class); 

    @Autowired 
    private GenericObjectPool<SocketConnection> socketPool; 
    @Autowired 
    private ExecutorService executorService; 

    public Response eligibility(Request request) throws DataException { 
     EligibilityResponse response = null; 
     SocketConnection connection = null; 
     if (request != null) { 
      try { 
       connection = socketPool.borrowObject(); 
       Future<Response> future = executorService.submit(new SocketCallback(connection, request)); 
       response = future.get(); 
      } catch (Exception ex) { 
       LOGGER.error("Gateway error {}"); 
       throw new DataException("Gateway error", ex); 
      } finally { 
       socketPool.returnObject(connection); 
      } 
     } 

     return response; 
    } 

} 
相關問題