2017-07-19 53 views
0

我必須在JBoss中構建一個JAVA NIO服務器應用程序以從10-200個傳感器盒中讀取數據。他們打開一個流,並始終向我發送數據。溝通是雙向的。現在,有時會發生這些Box(或服務器)出現內部錯誤。爲了檢測這種問題,觀察者線程每5秒檢查一次數據塊是否自上次檢查後進入。如果我的盒子都沒有發送數據,那麼發生了一些不好的事情,我想重新啓動整個套接字通信。JAVA NIO服務器:如何重置所有連接

現在,很好地闡述瞭如何建立與NIO的套接字連接,但很難找到複雜的例子來清除重置它們。這是我的問題:當我的看門狗檢測到最近5秒沒有數據時,它調用close()然後startEngine()。但之後,仍然沒有數據到達。有些東西似乎被阻止,一些資源仍然相關或類似。如果我重新啓動我的JBoss,數據再次到達。有人可以給我一個提示嗎?

謝謝你的時間! 斯特凡

public class TestServer 
{ 
    private NIOServer server; 
    private HashMap<String, SocketChannel> clientsList = new HashMap<String, SocketChannel>(); 

    class NIOServer extends Thread 
    { 
     class MessageBuffer 
     { 
       int [] msgAsByte = new int[msgSize]; 
       int pos = 0; 
       int lastSign = 0;          
       int bytesRead = 0; 
     } 
     private ByteBuffer readBuffer = ByteBuffer.allocate(256); 
     private Selector selector; 
     private boolean stop = false; 
     private int[] ports; 
     private int msgSize = 48; 
     private HashMap<String,MessageBuffer> buffer = new HashMap<String, MessageBuffer>(); 

     private List<ServerSocketChannel> channels; 
     // Maps a SocketChannel to a list of ByteBuffer instances 
     private Map<SocketChannel, List<ByteBuffer>> pendingDataToWrite = new HashMap<SocketChannel, List<ByteBuffer>>(); 

     public NIOServer(int[] ports) { 
       this.ports = ports; 
     } 

     private void stopAll() 
     { 
       stop = true; 

       try 
       { 
        server.interrupt(); 
        server.join(3000); 
       } 
       catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
       closeConnections(); 
     } 

     public void sendData(SocketChannel socket, byte[] data) 
     { 
       // And queue the data we want written 
       synchronized (this.pendingDataToWrite) { 
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingDataToWrite.get(socket); 
        if (queue == null) { 
          queue = new ArrayList<ByteBuffer>(); 
          this.pendingDataToWrite.put(socket, queue); 
        } 
        queue.add(ByteBuffer.wrap(data)); 
       } 

       SelectionKey key = socket.keyFor(this.selector); 
       if(key != null) 
        key.interestOps(SelectionKey.OP_WRITE); 
       // Finally, wake up our selecting thread so it can make the required changes 
       this.selector.wakeup(); 
     } 

     public void run() 
     { 
       try 
       { 
        stop = false; 
        selector = Selector.open(); 
        channels = new ArrayList<ServerSocketChannel>(); 
        ServerSocketChannel serverchannel; 
        for (int port : ports) 
        { 
          try 
          { 
           serverchannel = ServerSocketChannel.open(); 
           serverchannel.configureBlocking(false); 
           try 
           { 
             serverchannel.socket().setReuseAddress(true); 
           } 
           catch(SocketException se) 
           { 
             // 
           } 
           serverchannel.socket().bind(new InetSocketAddress(port)); 
           serverchannel.register(selector, SelectionKey.OP_ACCEPT); 
           channels.add(serverchannel); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
        } 
        while (!stop) 
        { 

          SelectionKey key = null; 
          try 
          { 
           selector.select(); 
           Iterator<SelectionKey> keysIterator = selector.selectedKeys() 
              .iterator(); 
           while (keysIterator.hasNext()) 
           { 
             key = keysIterator.next(); 

             if(key.isValid()) 
             { 
              if (key.isAcceptable()) 
              { 
                accept(key); 
              } 
              else if (key.isReadable()) 
              { 
                readData(key); 
              } 
              else if (key.isWritable()) 
              { 
                writeData(key); 
              } 
             } 
             else 
             { 
              SocketChannel sc = (SocketChannel) key.channel(); 
             } 
             keysIterator.remove(); 
           } 
          } 
          catch (Exception e) 
          { 
           if(e instanceof IOException || e instanceof ClosedSelectorException) 
           { 
             try 
             { 
              ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
              channels.remove(ssc); 
              ssc.close(); 
              key.cancel(); 
             } 
             catch(Exception ex) 
             { 
              // 
             } 

           } 
           else 
           { 
             // 
           } 
          } 
        } 
       } 
       catch(Exception e1) 
       { 
        // 
       } 

       closeConnections(); 

     } 

     private void closeConnections() 
     { 
       //if thread is stopped, close all 
       try 
       { 
        try 
        { 
          if(this.selector == null || this.selector.keys() == null) 
          { 
           log.debug("No selectors or keys found to close"); 
          } 
          else 
          { 
           Iterator<SelectionKey> keys = this.selector.keys().iterator(); 
           while(keys.hasNext()) 
           { 
             SelectionKey key = keys.next(); 
             key.cancel(); 
           } 
          } 
        } 
        catch(Exception ex) { 
          // 
        } 
        if(selector != null) 
          selector.close(); 
        if(channels != null) 
        { 
          for(ServerSocketChannel channel:channels) 
          { 
           channel.socket().close(); 
           channel.close(); 
          } 
        } 

        if(clientsList != null) 
        { 
          Iterator<Map.Entry<String, SocketChannel>> hfm = clientsList.entrySet().iterator(); 
          while(hfm.hasNext()) 
          { 
           Map.Entry<String, SocketChannel> s = hfm.next(); 
           s.getValue().close(); 
          } 
        } 
        clientsList=null; 

        selector = null; 
        channels = null; 
        pendingDataToWrite = null; 
       } 
       catch(Exception e) 
       { 
        // 
       } 

     } 

     private void accept(SelectionKey key) throws IOException 
     { 

       ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
       SocketChannel sc = ssc.accept(); 
       sc.configureBlocking(false); 
       sc.register(selector, SelectionKey.OP_READ); 

       String ip = sc.socket().getRemoteSocketAddress().toString(); 
       if(!buffer.containsKey(ip)) 
        buffer.put(ip, new MessageBuffer()); 
     } 

     private void readData(SelectionKey key) throws Exception 
     { 

       SocketChannel sc = (SocketChannel) key.channel();  

       MessageBuffer buf = buffer.get(sc.socket().getRemoteSocketAddress().toString()); 
       try 
       { 
        buf.bytesRead = sc.read(readBuffer); //read into buffer. 
       } 
       catch(Exception e2) 
       { 
        sc.close(); 
        buffer.remove(sc); 
       } 

       //close connection 
       if (buf.bytesRead == -1) 
       { 
        sc.close(); 
        key.cancel(); 
        return; 
       } 

       readBuffer.flip();  //make buffer ready for read 

       while(readBuffer.hasRemaining()) 
       { 
        //Read the data and forward it to another Process... 
       } 

       readBuffer.compact(); //make buffer ready for writing 

     } 

     private void writeData(SelectionKey key) throws Exception 
     { 
       SocketChannel socketChannel = (SocketChannel) key.channel(); 
       synchronized (this.pendingDataToWrite) { 
        List queue = (List) this.pendingDataToWrite.get(socketChannel); 

        // Write until there's not more data ... 
        while (!queue.isEmpty()) { 
          ByteBuffer buf = (ByteBuffer) queue.get(0); 
          try 
          { 
           socketChannel.write(buf); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
          finally 
          { 
           queue.remove(0); 
          } 
          if (buf.remaining() > 0) { 
           // ... or the socket's buffer fills up 
           break; 
          } 
        } 

        key.interestOps(SelectionKey.OP_READ); 
       } 
     } 
    } 



    public void close() { 

     if (server != null && server.isAlive()) 
     {  
        server.stopAll(); 
     } 
     if(clientsList != null) 
     { 
       clientsList.clear(); 
     } 
     server = null; 

    } 

    public void startEngine(int[] ports) { 
     if (ports != null) { 
       for (int port : ports) 
        log.info("Listening on port " + port); 
       server= new NIOServer(ports); 
       server.start(); 
     } 
    } 

} 

回答

1

使用select()超時。

如果超時發生,關閉所有註冊的SocketChannels

如果您希望獲得更精細的信息,請記錄每個通道上的最後I/O時間,並關閉每個select()循環底部已過期的那些時間。

注意您的OP_WRITE技術不正確。這裏有很多答案顯示如何正確使用它。

+0

感謝您的意見,我想你的意思是你的意見,像這樣的線程https://stackoverflow.com/questions/17556901/java-high-load-nio-tcp-server?這意味着我應該在寫入時寫入,並且只有當此操作返回0時,纔會註冊OP_WRITE。正確? select + timeout幫助我識別沒有新數據到達時,但它沒有解決我調用close()和restartEngine()後仍然沒有得到新數據的問題... – user3354754

+0

您不會得到任何東西,直到客戶重新連接。 – EJP

+0

好吧,當我重新啓動我的JBoss,他們重新連接,所有工作正常。但是,如果我只關閉套接字,就不要這樣做。有沒有辦法強制他們重新連接 – user3354754