2013-11-01 68 views
3

我使用netty編寫客戶端以高速發送消息。 By jConsole我看到「老一代」正在增加,最後它拋出了 java.lang.OutOfMemoryError:超出了GC開銷限制。 有沒有一些方法或配置來避免此異常 以下是我的測試代碼:使用netty以高速發送消息時獲取OOM異常

import io.netty.bootstrap.Bootstrap; 
    import io.netty.channel.Channel; 
    import io.netty.channel.ChannelFuture; 
    import io.netty.channel.ChannelInitializer; 
    import io.netty.channel.ChannelOption; 
    import io.netty.channel.ChannelPipeline; 
    import io.netty.channel.EventLoopGroup; 
    import io.netty.channel.nio.NioEventLoopGroup; 
    import io.netty.channel.socket.SocketChannel; 
    import io.netty.channel.socket.nio.NioSocketChannel; 
    import io.netty.handler.codec.string.StringEncoder; 

    import java.io.IOException; 
    import java.net.UnknownHostException; 
    import java.util.concurrent.ExecutionException; 
    import java.util.concurrent.TimeoutException; 

    public class TestNettyTcp { 
     private EventLoopGroup group; 

     private Channel ch; 

     /** 
     * @param args 
     * @throws SyslogSenderException 
     * @throws TimeoutException 
     * @throws ExecutionException 
     * @throws IOException 
     * @throws InterruptedException 
     * @throws UnknownHostException 
     */ 
     public static void main(String[] args) 
      throws UnknownHostException, InterruptedException, IOException, ExecutionException, TimeoutException { 
      new TestNettyTcp().testSendMessage(); 
     } 

     public TestNettyTcp() 
      throws InterruptedException { 
      group = new NioEventLoopGroup(); 
      Bootstrap b = new Bootstrap(); 
      b.group(group).channel(NioSocketChannel.class) 
       // .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024) 
       .option(ChannelOption.SO_RCVBUF, 1048576).option(ChannelOption.SO_SNDBUF, 1048576) 
       .option(ChannelOption.TCP_NODELAY, true).handler(new NettyTcpSyslogSenderInitializer()); 
      // Connect to a server. 
      ChannelFuture future = b.connect("192.168.22.70", 514); 
      future.awaitUninterruptibly(); 
      // Now we are sure the future is completed. 
      assert future.isDone(); 

      if (!future.isSuccess()) { 
       future.cause().printStackTrace(); 
      } 
      else { 
       ch = future.sync().channel(); 
      } 
     } 

     public void testSendMessage() 
      throws InterruptedException, UnknownHostException, IOException, ExecutionException, TimeoutException { 

      ThreadGroup threadGroup = new ThreadGroup("SendMessage"); 
      for (int k = 0; k < 10; k++) { 
       Thread thread = new Thread(threadGroup, new Runnable() { 

        @Override 
        public void run() { 

         String payLoad = "key=\"value\" key2=\"value2\" key3=\"value3\" Count:"; 

         try { 
          for (int j = 0; j < 100; j++) { 
           long a = System.currentTimeMillis(); 
           for (int i = 0; i < 20000; i++) { 

            ch.writeAndFlush(payLoad + j + "_" + i + "\n"); 
           } 
           System.out.println("\r<br>Excuted time : " + (System.currentTimeMillis() - a)/1000f 
            + "seconde"); 
          } 

         } 
         catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
         finally { 
          if (ch != null) { 
           ch.close(); 
          } 
         } 
        } 

       }); 
       thread.start(); 
      } 

      while (threadGroup.activeCount() > 0) { 
       try { 
        Thread.sleep(1000); 
       } 
       catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 

    class NettyTcpSyslogSenderInitializer 
     extends ChannelInitializer<SocketChannel> { 

     public NettyTcpSyslogSenderInitializer() { 
      super(); 
     } 

     @Override 
     public void initChannel(SocketChannel ch) 
      throws Exception { 
      ChannelPipeline pipeline = ch.pipeline(); 
      pipeline.addLast(new StringEncoder()); 
     } 
    } 

的代碼可以重現該問題很快

回答

5

你正在寫快於網絡堆棧可以處理。請注意,它全部是異步的...一旦Channel.isWritable()返回false,並且一旦它再次返回true,就會停止寫入。您可以通過覆蓋ChannelInboundHandler中的channelWritabilityChanged(...)方法來通知此更改。