2014-09-24 869 views
0

我有一個簡單的ECHO服務器和使用Netty編寫的客戶端。服務器和客戶端位於同一臺計算機上。我期待的平均延遲時間爲幾毫秒,然而,無論我嘗試什麼,我都無法將延遲降至亞毫秒的持續時間。任何幫助將不勝感激。在Netty中設置低延遲客戶端/服務器示例

更新:即使使用System.nanoTime,我也會看到25-30ms左右的延遲。

EchoClient

import org.jboss.netty.bootstrap.ClientBootstrap; 
import org.jboss.netty.channel.*; 
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 
import org.jboss.netty.handler.execution.ExecutionHandler; 
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; 

import java.net.InetSocketAddress; 
import java.util.concurrent.Executors; 

public class EchoClient { 

    public static void main(String[] args) { 

     if (args.length != 1) { 
      System.err.println(String.format("usage: %s <num-msgs>", EchoClient.class.getCanonicalName())); 
      System.exit(1); 
     } 

     final long NUM_MSGS = Integer.parseInt(args[0]); 

     final EchoClientHandler echoClientHandler = new EchoClientHandler(); 

     final ExecutionHandler e = 
       new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(4, 128 * 1024L, 128 * 1024L)); 
     ChannelFactory factory = 
       new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 
                Executors.newCachedThreadPool()); 

     ClientBootstrap bootstrap = new ClientBootstrap(factory); 
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      @Override 
      public ChannelPipeline getPipeline() throws Exception { 
       return Channels.pipeline(new TestPayloadEncoder(), 
             new TestPayloadDecoder(), 
             e, 
             echoClientHandler); 
      } 
     }); 
     bootstrap.setOption("tcpNoDelay", true); 
     bootstrap.setOption("keepAlive", false); 
     bootstrap.setOption("child.keepAlive", false); 
     bootstrap.setOption("sendBufferSize", 128 * 1024L); 
     bootstrap.setOption("receiveBufferSize", 128 * 1024L); 

     for (int i = 0; i < NUM_MSGS; i++) { 
      final InetSocketAddress serverAddr = 
        new InetSocketAddress("localhost", 8080); 

      bootstrap.connect(serverAddr).addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture f) throws Exception { 
        if (f.isSuccess()) { 
         f.getChannel().write(new TestPayload()); 
        } 
       } 
      }); 
     } 

     while (echoClientHandler.numMsgs.get() < NUM_MSGS); 

     System.out.println(echoClientHandler.numMsgs); 
     System.out.println(echoClientHandler.aggTime); 
     System.out.println(String.format("mean transfer time: %.2fms", 
             ((float) echoClientHandler.aggTime.get())/
             echoClientHandler.numMsgs.get())); 
     System.out.flush(); 

     e.releaseExternalResources(); 
     factory.releaseExternalResources(); 
    } 

} 

EchoClientHandler

import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.channel.ExceptionEvent; 
import org.jboss.netty.channel.MessageEvent; 
import org.jboss.netty.channel.SimpleChannelHandler; 

import java.util.concurrent.atomic.AtomicLong; 

public class EchoClientHandler extends SimpleChannelHandler { 

    public final AtomicLong numMsgs = new AtomicLong(0); 
    public final AtomicLong aggTime = new AtomicLong(0); 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     long recvTime = System.currentTimeMillis(); 
     TestPayload m = (TestPayload) e.getMessage(); 
     aggTime.addAndGet(recvTime - m.getTime()); 
     numMsgs.incrementAndGet(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     e.getCause().printStackTrace(); 
     e.getChannel().close(); 
    } 

} 

EchoServer的

import org.jboss.netty.bootstrap.ServerBootstrap; 
import org.jboss.netty.channel.ChannelFactory; 
import org.jboss.netty.channel.ChannelPipeline; 
import org.jboss.netty.channel.ChannelPipelineFactory; 
import org.jboss.netty.channel.Channels; 
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 
import org.jboss.netty.handler.execution.ExecutionHandler; 
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; 

import java.net.InetSocketAddress; 
import java.util.concurrent.Executors; 

public class EchoServer { 

    public static void main(String[] args) { 
     ChannelFactory factory = 
       new NioServerSocketChannelFactory(Executors.newFixedThreadPool(4), 
                Executors.newFixedThreadPool(32), 
                32); 

     ServerBootstrap bootstrap = new ServerBootstrap(factory); 
     final ExecutionHandler e = 
       new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(4, 128 * 1024L, 128 * 1024L)); 
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      @Override 
      public ChannelPipeline getPipeline() throws Exception { 
       return Channels.pipeline(e, new EchoServerHandler()); 
      } 
     }); 
     bootstrap.setOption("reuseAddr", true); 
     bootstrap.setOption("keepAlive", false); 
     bootstrap.setOption("child.reuseAddr", true); 
     bootstrap.setOption("child.soLinger", 0); 
     bootstrap.setOption("child.keepAlive", false); 
     bootstrap.setOption("child.tcpNoDelay", true); 
     bootstrap.setOption("child.sendBufferSize", 128 * 1024L); 
     bootstrap.setOption("child.receiveBufferSize", 128 * 1024L); 
     bootstrap.bind(new InetSocketAddress("localhost", 8080)); 
    } 

} 

EchoServerHandler

import org.jboss.netty.channel.*; 

public class EchoServerHandler extends SimpleChannelHandler { 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     e.getChannel().write(e.getMessage()); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     e.getCause().printStackTrace(); 
     e.getChannel().close(); 
    } 

} 

TestPayload

import org.jboss.netty.buffer.ChannelBuffer; 

import java.util.Date; 
import java.util.Random; 

public class TestPayload { 

    private static final int PREAMBLE_LEN = (Long.SIZE + Integer.SIZE)/8; 

    private static final Random RNG; 
    static { 
     RNG = new Random(); 
     RNG.setSeed(new Date().getTime()); 
    } 

    private final int paddingLen; 
    private final byte[] padding; 
    private final long time; 

    public TestPayload() { 
     this(65536); 
    } 

    public TestPayload(int sizeInBytes) { 
     this.paddingLen = sizeInBytes; 
     this.padding = new byte[this.paddingLen]; 
     RNG.nextBytes(this.padding); 
     this.time = System.currentTimeMillis(); 
    } 

    private TestPayload(long time, int paddingLen, byte[] padding) { 
     this.paddingLen = paddingLen; 
     this.padding = padding; 
     this.time = time; 
    } 

    public long getTime() { 
     return this.time; 
    } 

    public void writeTo(ChannelBuffer buf) { 
     buf.writeLong(this.time); 
     buf.writeInt(this.paddingLen); 
     buf.writeBytes(this.padding); 
    } 

    public static TestPayload readFrom(ChannelBuffer buf) { 
     if (buf.readableBytes() < PREAMBLE_LEN) { 
      return null; 
     } 

     buf.markReaderIndex(); 

     long time = buf.readLong(); 
     int paddingLen = buf.readInt(); 

     if (buf.readableBytes() < paddingLen) { 
      buf.resetReaderIndex(); 
      return null; 
     } 

     byte[] padding = new byte[paddingLen]; 
     buf.readBytes(padding); 

     return new TestPayload(time, paddingLen, padding); 
    } 

    public int getLength() { 
     return PREAMBLE_LEN + this.paddingLen; 
    } 
+0

如果你想測量亞毫米,然後使用System.nanoTime()這是一個計時器。 System.currentTimeMillis()是一個「掛鐘 – ssedano 2014-09-24 21:08:23

+0

編輯後...謝謝! – Bala 2014-09-24 21:33:43

回答

0

你運行你的客戶和不同的JVM您的服務器?如果是這樣,跨JVM邊界測量時間並不像您想象的那麼直截了當。例如使用System.nanoTime()不一定會根據oracle java doc工作:

通過此方法返回的值變爲有意義只有當兩個這種值之間的差,一個Java的相同實例內獲得計算虛擬機。

假設你可以找到衡量跨JVM中,如果你的目標是孤立它需要多長時間Netty的客戶端發送到Netty的服務器然後簡化你的使用情況來隔離儘可能多的時間的可靠途徑。例如,在上面的代碼中,您正在計算髮送/接收65536字節數組的時間。從時間實驗中刪除這個以幫助找出瓶頸所在的位置。

您從中收集了多少次運行?您是否排除Netty本身的初始化時間(在進行計時之前,在客戶端/服務器之間運行幾條消息)?

另外如何調整您的配置影響性能?有很多旋鈕可以調整(線程池大小,發送/接收緩衝區大小等)。

您使用的是哪種版本的Netty,並且在寫入後是否有強制刷新的選項?

我看不到EchoClient的代碼。它看起來像複製/粘貼代碼EchoClientHandler,其中EchoClient的代碼應該是。

+0

是的,客戶端和服務器在不同的JVMs。我試圖測量從我發送開始時間到接收完成,這個計時器可能比我想測量的更精確,但大部分我相信它應該沒問題。 – Bala 2014-09-25 05:51:17

+0

我不知道要配置什麼來獲得亞毫秒級的延遲。實際上,如果我能達到這個目標,4-6毫米以下的任何東西都會很棒。 – Bala 2014-09-25 05:51:55

+0

你不知道這是確定的,直到你試驗。你可以發佈數字,減少你的測試用例複雜性,並且用這些數字爲你如何進行時間交叉JVM添加一些理由?你也將不得不嘗試你設置的不同選項。例如修改OrderedMemoryAwareThreadPoolExecutor參數有什麼優點/缺點?你需要使用與你一樣多的線程嗎? – 2014-09-26 20:41:47

相關問題