2013-04-29 42 views
0

我不知道如何做到這一點,我認爲我的方法是錯誤的 - 有人可以給我一個提示嗎?Netty 4.0 - 將數據發送到不同的事件迴路上的通道

我已經做了一個代理服務器非常相似的例子 HexDumpProxy。 目的不是轉儲流量,而是通過代理操縱數據解析,這部分工作完美。讓我們稱之爲代理部分。

在同一個程序中,我啓動了另一個線程,使用另一個ServerBootstrap在另一個端口上偵聽,這有它自己的eventloop等等。當我在這個偵聽端口上收到一些東西時,我想將這些數據發送到代理部分,我想要動態地改變這個頻道。當我將數據發送到其中一個代理通道時,出現此錯誤:

2013年4月29日10:05:10 PM BackendListenHandler exceptionCaught 警告:來自下游的意外異常。 java.lang.IllegalStateException:nextOutboundByteBuffer()從事件循環

@Sharable 
public class BackendListenHandler extends ChannelInboundByteHandlerAdapter { 

private Channel outboundChannel; 

@Override 
public void channelActive(ChannelHandlerContext ctx) throws Exception { 
    ctx.read(); 
    ctx.flush(); 
} 

@Override 
public void inboundBufferUpdated(final ChannelHandlerContext ctx, ByteBuf in) throws Exception { 

    outboundChannel = Proxy.connectionTable.frontendListenChannel; 

    if (!outboundChannel.isActive()) { 
     System.out.println("channel id=" + outboundChannel.id() + " is NOT active..."); 
    } else if (outboundChannel.isActive()) { 
     ByteBuf out = outboundChannel.outboundByteBuffer(); 
     out.writeBytes(in); 
     outboundChannel.flush().addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       if (future.isSuccess()) { 
        // was able to flush out data, start to read the next chunk 
        ctx.channel().read(); 
       } else { 
        future.channel().close(); 
       } 
      } 
     }); 
    } 
} 

public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
    if (outboundChannel != null) { 
     closeOnFlush(outboundChannel); 
    } 
} 

@Override 
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
    logger.log(
     Level.WARNING, 
     "Unexpected exception from downstream.", cause); 
    ctx.close(); 
} 

static void closeOnFlush(Channel ch) { 
    if (ch.isActive()) { 
     ch.flush().addListener(ChannelFutureListener.CLOSE); 
    } 
} 
} 

爲了測試我保持和改變我的代理渠道在這個靜態變量外叫:

Proxy.connectionTable.frontendListenChannel; 

回答

0

代替使用的 「outboundChannel.outboundByteBuffer()」 - 我使用的 「Unpooled.copiedBuffer()」 要寫入的字節從另一個事件循環到信道。

@Override 
public void inboundBufferUpdated(final ChannelHandlerContext ctx, ByteBuf in) throws Exception { 
outboundChannel = null; 
if (Proxy.connectionTable.channelId != 0) { 
    outboundChannel = Proxy.allChannels.find(Proxy.connectionTable.channelId); 
    if (outboundChannel.isActive()) {    
     System.out.println("NOTIFY channel id=" + outboundChannel.id() + " is active..."); 
     Rewrite rewrite = new Rewrite(byteBufConverter.byteBufToString(in), 2); 
     in.clear(); 
     in = byteBufConverter.stringToByteBuf(rewrite.getFixedMessage()); 
     outboundChannel.write(in); 
     outboundChannel.flush().addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       if (future.isSuccess()) { 
        ctx.channel().read(); 
       } else { 
        future.channel().close(); 
       } 
      } 
     }); 
    } 
} 
} 

其中 「byteBufConverter.stringToByteBuf(rewrite.getFixedMessage())」 收益 「Unpooled.copiedBuffer(strFixed.getBytes())」

0

當創建使用相同的事件循環「出站」連接的引導程序,如HexDump示例中所示。這將確保一切都從相同的IO線程處理。 參見[1]。

[1] https://github.com/netty/netty/blob/master/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java#L46

+0

我不能channelActive在HexDumpProxyFrontendHandler.java開始新ServerBootstrap因爲那時我會在每次有人試圖連接到HexDumpProxyFrontendHandler時啓動ServerBootstrap。我不認爲這是可能的? – Martin 2013-05-01 14:07:56

+0

我的意思是,我不需要每次在HexDumpProxyFrontendHandler上建立連接時都啓動一個新的ServerBootstrap。 – Martin 2013-05-02 01:21:45

+0

我不遵循...每次都不啓動新的ServerBootstrap。你所做的是每次構建一個新的Bootstrap,因爲它是輕量級的,並且與ServerBootstrap共享EventLoopGroup。這樣不需要額外的資源。 – 2013-05-02 05:39:46

相關問題