2012-03-03 57 views
3

我有一個屬於單個線程的套接字列表。但我不知道是否有一種可行的方式與那些客戶進行溝通(讀/寫/來)?我不想爲每個客戶端創建一個線程,因爲可能有太多的用戶,並且爲他們每個創建一個線程可能代價太大。Java:如何在單個線程中與多個客戶端進行通信

+0

創建線程相當便宜。在擔心性能之前,專注於使您的程序正常工作。 – artbristol 2012-03-03 08:53:36

回答

3

Netty的 - 在Java NIO客戶端服務器套接字框架 http://www.jboss.org/netty

+0

netty的新網站是http://netty.io – 2012-03-05 17:21:54

1

您可以使用NIO的方式在JRE。另一種解決方案是使用Space Architecture。在此體系結構中存在具有Space名稱的全局空間,並在此空間中寫入任何請求,然後另一個線程從此空間讀取並處理它,並將處理結果寫入另一個空間,並在最後一步請求線程從指定空間讀取自己的結果。

你可以看到下面的鏈接瞭解更多信息:

http://en.wikipedia.org/wiki/Space_architecture

+1

您可以'使用NIO方法'* *不使用* JBoss。它內置於JRE中。 OP對JBoss一無所知:介紹它是無關緊要的。 – EJP 2012-03-03 08:56:48

+0

對不起,你是對的,我編輯我的答案。謝謝。 – MJM 2012-03-03 09:02:35

3

只需使用標準的Java NIO 最好的文檔的Java主 http://docs.oracle.com/javase/6/docs/technotes/guides/io/index.html頁上寫的。 有API文檔,樣本,教程。一切。 我向你保證它是唯一的 - 我有經驗編寫10k客戶端連接到一個客戶端(幾個線程)的軟件。您只需記住操作系統限制即可在配置中更改它。

1

我不得不連接多個服務器IP:PORT並做請求 - 響應消息。用多線程實現傳統的IO後,一個看門狗被阻塞的插槽被放棄。我製作了NIO,這是我的測試應用程序,供將來參考。

我可以用超時打開N個連接,用超時讀取回復,用簡單的單線程「遊戲循環」寫超時的所有命令。如果我需要併發性,我可以產生工作線程,但如果應用程序邏輯不需要它,則不是必需的。

服務器是一個自定義的telnet應用程序,客戶端編寫命令並讀取文本行,直到找到終結者行提示符。終結者標記了end_of_response_packet。

import java.util.*; 
import java.io.*; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.SocketChannel; 

public class ClientSocketNIO { 
    private String host; 
    private int port; 
    private String charset; 
    private ByteArrayOutputStream inBuffer; 
    private ByteBuffer buf; 
    private Selector selector; 
    private SocketChannel channel; 

    public ClientSocketNIO(String host, int port, String charset) { 
     this.charset = charset==null || charset.equals("") ? "UTF-8" : charset; 
     this.host = host; 
     this.port = port; 
    } 

    public void open(long timeout) throws IOException { 
     selector = Selector.open(); 
     channel = SocketChannel.open(); 
     channel.configureBlocking(false); 
     channel.register(selector, SelectionKey.OP_CONNECT); 
     channel.connect(new InetSocketAddress(host, port)); 
     inBuffer = new ByteArrayOutputStream(1024); 
     buf = ByteBuffer.allocate(1*1024); 
     long sleep = Math.min(timeout, 1000); 
     while(timeout > 0) { 
      if (selector.select(sleep) < 1) { 
       timeout-=sleep; 
       continue; 
      } 
      Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); 
      while(keys.hasNext()) { 
       SelectionKey key = keys.next(); 
       keys.remove(); 
       if (!key.isValid() || !key.isConnectable()) continue; 
       SocketChannel channel = (SocketChannel)key.channel(); 
       if (channel.isConnectionPending()) { 
        channel.finishConnect(); 
        channel.configureBlocking(false); 
        return; // we are ready to receive bytes 
       } 
      } 
     } 
     throw new IOException("Connection timed out"); 
    } 

    public void close() { 
     try { channel.close(); } catch(Exception ex) { } 
     try { selector.close(); } catch(Exception ex) { } 
     inBuffer=null; 
     buf=null; 
    } 

    public List<String> readUntil(String terminator, long timeout, boolean trimLines) throws IOException { 
     return readUntil(new String[]{terminator}, timeout, trimLines); 
    } 

    public List<String> readUntil(String[] terminators, long timeout, boolean trimLines) throws IOException { 
     List<String> lines = new ArrayList<String>(12); 
     inBuffer.reset(); 

     // End of packet terminator strings, line startsWith "aabbcc" string. 
     byte[][] arrTerminators = new byte[terminators.length][]; 
     int[] idxTerminators = new int[terminators.length]; 
     for(int idx=0; idx < terminators.length; idx++) { 
      arrTerminators[idx] = terminators[idx].getBytes(charset); 
      idxTerminators[idx] = 0; 
     } 
     int idxLineByte=-1; 

     channel.register(selector, SelectionKey.OP_READ); 
     long sleep = Math.min(timeout, 1000); 
     while(timeout>0) { 
      if (selector.select(sleep) < 1) { 
       timeout-=sleep; 
       continue; 
      } 
      Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); 
      while(keys.hasNext()) { 
       SelectionKey key = keys.next(); 
       keys.remove(); 
       if (!key.isValid() || !key.isReadable()) continue; 
       SocketChannel channel = (SocketChannel)key.channel(); 
       buf.clear(); 
       int len = channel.read(buf); 
       System.out.println("read " + len); 
       if (len == -1) throw new IOException("Socket disconnected"); 
       buf.flip(); 
       for(int idx=0; idx<len; idx++) { 
        byte cb = buf.get(idx); 
        if (cb!='\n') { 
         idxLineByte++; 
         inBuffer.write(cb); 
         for(int idxter=0; idxter < arrTerminators.length; idxter++) { 
          byte[] arrTerminator = arrTerminators[idxter]; 
          if (idxLineByte==idxTerminators[idxter] 
            && arrTerminator[ idxTerminators[idxter] ]==cb) { 
           idxTerminators[idxter]++; 
           if (idxTerminators[idxter]==arrTerminator.length) 
            return lines; 
          } else idxTerminators[idxter]=0; 
         } 
        } else { 
         String line = inBuffer.toString(charset); 
         lines.add(trimLines ? line.trim() : line); 
         inBuffer.reset(); 
         idxLineByte=-1; 
         for(int idxter=0; idxter<arrTerminators.length; idxter++) 
          idxTerminators[idxter]=0; 
        } 
       } 
      } 
     } 
     throw new IOException("Read timed out"); 
    } 

    public void write(String data, long timeout) throws IOException { 
     ByteBuffer outBuffer = ByteBuffer.wrap(data.getBytes(charset)); 
     channel.register(selector, SelectionKey.OP_WRITE); 
     long sleep = Math.min(timeout, 1000); 
     while(timeout > 0) { 
      if (selector.select(sleep) < 1) { 
       timeout-=sleep; 
       continue; 
      } 
      Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); 
      while(keys.hasNext()) { 
       SelectionKey key = keys.next(); 
       keys.remove(); 
       if (!key.isValid() || !key.isWritable()) continue; 
       SocketChannel channel = (SocketChannel)key.channel(); 
       int len = channel.write(outBuffer); 
       System.out.println("write " + len); 
       if (outBuffer.remaining()<1) 
        return; 
      } 
     } 
     throw new IOException("Write timed out"); 
    } 

    public static void main(String[] args) throws Exception { 
     ClientSocketNIO client = new ClientSocketNIO("11.22.33.44", 1234, "UTF-8"); 
     try { 
      client.open(15000); 

      // read prompting for username 
      List<String> reply = client.readUntil("User: ", 15000, true); 
      for(int idx=0; idx<reply.size(); idx++) 
       System.out.println("|"+reply.get(idx)+"|");   

      // write username and read a success or failed prompt(asks username once again), 
      // this one may return two different terminator prompts so listen for both 
      client.write("myloginname\n", 15000); 
      reply = client.readUntil(new String[]{"> ", "User: "}, 15000, true); 
      for(int idx=0; idx<reply.size(); idx++) 
       System.out.println("|"+reply.get(idx)+"|"); 
      if (!reply.get(reply.size()-1).startsWith("Welcome ")) return; // Access denied 

      System.out.println("-----"); 
      client.write("help\n", 15000); 
      reply = client.readUntil("> ", 15000, true); 
      for(int idx=0; idx<reply.size(); idx++) 
       System.out.println("|"+reply.get(idx)+"|"); 

      System.out.println("-----"); 
      client.write("get status\n", 15000); 
      reply = client.readUntil("> ", 15000, true); 
      for(int idx=0; idx<reply.size(); idx++) 
       System.out.println("|"+reply.get(idx)+"|"); 

      System.out.println("-----"); 
      client.write("get list\n", 15000); 
      reply = client.readUntil("> ", 15000, true); 
      for(int idx=0; idx<reply.size(); idx++) 
       System.out.println("|"+reply.get(idx)+"|"); 

      client.write("quit\n", 15000); 
     } finally { 
      client.close(); 
     } 
    } 

} 
相關問題