我有一個簡單的ECHO服務器和使用Netty編寫的客戶端。服務器和客戶端位於同一臺計算機上。我期待的平均延遲時間爲幾毫秒,然而,無論我嘗試什麼,我都無法將延遲降至亞毫秒的持續時間。任何幫助將不勝感激。在Netty中設置低延遲客戶端/服務器示例
更新:即使使用System.nanoTime,我也會看到25-30ms左右的延遲。
EchoClient
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
public class EchoClient {
public static void main(String[] args) {
if (args.length != 1) {
System.err.println(String.format("usage: %s <num-msgs>", EchoClient.class.getCanonicalName()));
System.exit(1);
}
final long NUM_MSGS = Integer.parseInt(args[0]);
final EchoClientHandler echoClientHandler = new EchoClientHandler();
final ExecutionHandler e =
new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(4, 128 * 1024L, 128 * 1024L));
ChannelFactory factory =
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new TestPayloadEncoder(),
new TestPayloadDecoder(),
e,
echoClientHandler);
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", false);
bootstrap.setOption("child.keepAlive", false);
bootstrap.setOption("sendBufferSize", 128 * 1024L);
bootstrap.setOption("receiveBufferSize", 128 * 1024L);
for (int i = 0; i < NUM_MSGS; i++) {
final InetSocketAddress serverAddr =
new InetSocketAddress("localhost", 8080);
bootstrap.connect(serverAddr).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
f.getChannel().write(new TestPayload());
}
}
});
}
while (echoClientHandler.numMsgs.get() < NUM_MSGS);
System.out.println(echoClientHandler.numMsgs);
System.out.println(echoClientHandler.aggTime);
System.out.println(String.format("mean transfer time: %.2fms",
((float) echoClientHandler.aggTime.get())/
echoClientHandler.numMsgs.get()));
System.out.flush();
e.releaseExternalResources();
factory.releaseExternalResources();
}
}
EchoClientHandler
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import java.util.concurrent.atomic.AtomicLong;
public class EchoClientHandler extends SimpleChannelHandler {
public final AtomicLong numMsgs = new AtomicLong(0);
public final AtomicLong aggTime = new AtomicLong(0);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
long recvTime = System.currentTimeMillis();
TestPayload m = (TestPayload) e.getMessage();
aggTime.addAndGet(recvTime - m.getTime());
numMsgs.incrementAndGet();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
EchoServer的
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
public class EchoServer {
public static void main(String[] args) {
ChannelFactory factory =
new NioServerSocketChannelFactory(Executors.newFixedThreadPool(4),
Executors.newFixedThreadPool(32),
32);
ServerBootstrap bootstrap = new ServerBootstrap(factory);
final ExecutionHandler e =
new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(4, 128 * 1024L, 128 * 1024L));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(e, new EchoServerHandler());
}
});
bootstrap.setOption("reuseAddr", true);
bootstrap.setOption("keepAlive", false);
bootstrap.setOption("child.reuseAddr", true);
bootstrap.setOption("child.soLinger", 0);
bootstrap.setOption("child.keepAlive", false);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.sendBufferSize", 128 * 1024L);
bootstrap.setOption("child.receiveBufferSize", 128 * 1024L);
bootstrap.bind(new InetSocketAddress("localhost", 8080));
}
}
EchoServerHandler
import org.jboss.netty.channel.*;
public class EchoServerHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
e.getChannel().write(e.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
TestPayload
import org.jboss.netty.buffer.ChannelBuffer;
import java.util.Date;
import java.util.Random;
public class TestPayload {
private static final int PREAMBLE_LEN = (Long.SIZE + Integer.SIZE)/8;
private static final Random RNG;
static {
RNG = new Random();
RNG.setSeed(new Date().getTime());
}
private final int paddingLen;
private final byte[] padding;
private final long time;
public TestPayload() {
this(65536);
}
public TestPayload(int sizeInBytes) {
this.paddingLen = sizeInBytes;
this.padding = new byte[this.paddingLen];
RNG.nextBytes(this.padding);
this.time = System.currentTimeMillis();
}
private TestPayload(long time, int paddingLen, byte[] padding) {
this.paddingLen = paddingLen;
this.padding = padding;
this.time = time;
}
public long getTime() {
return this.time;
}
public void writeTo(ChannelBuffer buf) {
buf.writeLong(this.time);
buf.writeInt(this.paddingLen);
buf.writeBytes(this.padding);
}
public static TestPayload readFrom(ChannelBuffer buf) {
if (buf.readableBytes() < PREAMBLE_LEN) {
return null;
}
buf.markReaderIndex();
long time = buf.readLong();
int paddingLen = buf.readInt();
if (buf.readableBytes() < paddingLen) {
buf.resetReaderIndex();
return null;
}
byte[] padding = new byte[paddingLen];
buf.readBytes(padding);
return new TestPayload(time, paddingLen, padding);
}
public int getLength() {
return PREAMBLE_LEN + this.paddingLen;
}
如果你想測量亞毫米,然後使用System.nanoTime()這是一個計時器。 System.currentTimeMillis()是一個「掛鐘 – ssedano 2014-09-24 21:08:23
編輯後...謝謝! – Bala 2014-09-24 21:33:43