2011-05-02 51 views
4

在傳統的阻塞線程的服務器,我會做這樣的事情爪哇 - 的readObject與NIO

class ServerSideThread { 

    ObjectInputStream in; 
    ObjectOutputStream out; 
    Engine engine; 

    public ServerSideThread(Socket socket, Engine engine) { 
     in = new ObjectInputStream(socket.getInputStream()); 
     out = new ObjectOutputStream(socket.getOutputStream()); 
     this.engine = engine; 
    } 

    public void sendMessage(Message m) { 
     out.writeObject(m); 
    } 

    public void run() { 
     while(true) { 
      Message m = (Message)in.readObject(); 
      engine.queueMessage(m,this); // give the engine a message with this as a callback 
     } 
    } 
} 

現在,對象可以被預期是相當大的。在我的nio循環中,我不能簡單地等待對象通過,所有其他連接(工作量更小)都會等待我。

如何纔可以通知連接具有整個對象,然後它會告訴我的nio通道它已準備好?

回答

7

您可以將對象寫入ByteArrayOutputStream,以便在發送對象之前給出長度。在接收端,嘗試解碼之前需要讀取所需的數據量。

然而,你可能會發現它更簡單,更有效地使用與對象*流阻塞的IO(而不是NIO)


編輯這樣的事情

public static void send(SocketChannel socket, Serializable serializable) throws IOException { 
    ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
    for(int i=0;i<4;i++) baos.write(0); 
    ObjectOutputStream oos = new ObjectOutputStream(baos); 
    oos.writeObject(serializable); 
    oos.close(); 
    final ByteBuffer wrap = ByteBuffer.wrap(baos.toByteArray()); 
    wrap.putInt(0, baos.size()-4); 
    socket.write(wrap); 
} 

private final ByteBuffer lengthByteBuffer = ByteBuffer.wrap(new byte[4]); 
private ByteBuffer dataByteBuffer = null; 
private boolean readLength = true; 

public Serializable recv(SocketChannel socket) throws IOException, ClassNotFoundException { 
    if (readLength) { 
     socket.read(lengthByteBuffer); 
     if (lengthByteBuffer.remaining() == 0) { 
      readLength = false; 
      dataByteBuffer = ByteBuffer.allocate(lengthByteBuffer.getInt(0)); 
      lengthByteBuffer.clear(); 
     } 
    } else { 
     socket.read(dataByteBuffer); 
     if (dataByteBuffer.remaining() == 0) { 
      ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array())); 
      final Serializable ret = (Serializable) ois.readObject(); 
      // clean up 
      dataByteBuffer = null; 
      readLength = true; 
      return ret; 
     } 
    } 
    return null; 
} 
+0

阻塞IO不會縮放。由於堆棧線程大小在整個JVM中都是不變的,因此每個連接線程將與我的引擎線程(這將需要相當大)相同的堆棧大小。因此,即使只有3000個連接,這也使我處於384Mb只是爲了連接。這不會爲1Gb盒子上的系統,堆和數據庫留下太多空間。 cpu的需求將會足夠小,以至於我可以輕鬆處理超過10000個連接,從而限制內存使用量。 – corsiKa 2011-05-02 22:18:14

+0

因此,在我發送大量數據首先讀取的情況下,我怎麼知道我排隊了多少數據直到...我讀了它? – corsiKa 2011-05-02 22:19:41

+1

對每個對象(或對象組)使用一個新的ObjectOutputStream(新的ByteArrayOutputStream)這樣你就沒有數據排隊了,你把字節數組的長度發送爲一個int,接着是字節數組。 ,你需要讀取4字節的最小值,這將給你對於該對象的數據的其餘部分的長度 – 2011-05-02 22:25:23

2

的靈感上面的代碼我創建了一個(GoogleCode project

它包括一個簡單的單元測試:

SeriServer server = new SeriServer(6001, nthreads); 
final SeriClient client[] = new SeriClient[nclients]; 

//write the data with multiple threads to flood the server 

for (int cnt = 0; cnt < nclients; cnt++) { 
    final int counterVal = cnt; 
    client[cnt] = new SeriClient("localhost", 6001); 
    Thread t = new Thread(new Runnable() { 
     public void run() { 
      try { 
       for (int cnt2 = 0; cnt2 < nsends; cnt2++) { 
        String msg = "[" + counterVal + "]";      
        client[counterVal].send(msg); 
       } 
      } catch (IOException e) { 
       e.printStackTrace(); 
       fail(); 
      } 
     } 
     }); 
    t.start(); 
} 

HashMap<String, Integer> counts = new HashMap<String, Integer>(); 
    int nullCounts = 0; 
    for (int cnt = 0; cnt < nsends * nclients;) { 
     //read the data from a vector (that the server pool automatically fills 
     SeriDataPackage data = server.read(); 
     if (data == null) { 
       nullCounts++; 
       System.out.println("NULL"); 
       continue; 
     } 

     if (counts.containsKey(data.getObject())) { 
       Integer c = counts.get(data.getObject()); 
       counts.put((String) data.getObject(), c + 1); 
     } else { 
       counts.put((String) data.getObject(), 1); 
     } 
     cnt++; 
     System.out.println("Received: " + data.getObject()); 
    } 

    // asserts the results 
    Collection<Integer> values = counts.values(); 
    for (Integer value : values) { 
     int ivalue = value; 
     assertEquals(nsends, ivalue); 
     System.out.println(value); 
    } 
    assertEquals(counts.size(), nclients); 
    System.out.println(counts.size()); 
    System.out.println("Finishing"); 
    server.shutdown();