2014-10-28 258 views
2

我使用EmbeddedChannel來測試我的handlerscodecs在下面的格式處理消息:的Netty ByteToMessageCodec <ByteBuf>解碼消息兩次(部分)

+------------+------------------+----------------+  
| Header | Payload Length | Payload  | 
| 16 bytes |  2 bytes  | "Some data" |  
+------------+------------------+----------------+ 

首先,我想達到的目標:

  1. 通過創建一個對象來存儲標題詳細信息並將解碼後的標題對象添加到ChannelHandlerContextAttributeMap中以供將來使用;
  2. 等待/檢索整個有效載荷數據;
  3. 將Header對象和整個有效載荷作爲ByteBuf用於路由消息的最終處理程序。

我使用以下處理:

  1. ByteToMessageCodec<ByteBuf>提取頭信息,並將其添加到屬性列表。
  2. LengthFieldBasedFrameDecoder讀取有效載荷長度並等待/檢索整個幀。
  3. SimpleChannelInboundHandler這將使用從屬性列表檢索到的頭對象相應地路由有效載荷。

當一個消息被傳遞給ByteToMessageCodecdecode方法,所述報頭處理和正確地提取。然後,我繼續將頭對象添加到AttributeMap並添加ByteBuf(其有一個可讀字節= 2字節(有效負載長度指示符)+有效負載長度)。

假設有效載荷長度是1020字節。該消息最初由codec收到,將有readableBytes = 16 bytes + 2 bytes + 1020 bytes。通過decode方法讀取頭部,然後將餘下的可用字節(1022)添加到List<Object> out

如果我的理解是正確的,該字節的其餘部分將現在被傳遞到下一個處理是LengthFieldBasedFrameDecoder將讀取的長度指標和有效載荷(1020個字節)傳遞給SimpleChannelHanlder,但我一定是搞錯了。

decode方法被稱爲再次,與List<Object> out相同的1022字節。

在解碼方法那裏的JavaDoc如下:

Decode the from one ByteBuf to an other. This method will be called till either the input ByteBuf 
has nothing to read when return from this method or till nothing was read from the input ByteBuf. 

這是否意味着decode將被調用,直到readableBytes == 0

將其餘消息傳遞給LengthFieldBasedFrameDecoder的最有效方法是什麼?

我承擔LengthFieldBasedFrameDecoder需要ByteBuf作爲輸入,所以這是否意味着我需要設置readerIndex = 0和ByteBuf的副本添加到List<Object> out

任何幫助/建議/批評將不勝感激,我想以最乾淨的方式做到這一點。

這裏是我的decode方法:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
    byte [] headerBytes = new byte[HEADER_LENGTH]; 
    in.readBytes(headerBytes, 0, HEADER_LENGTH); 

    Header header = new Header(headerBytes); 
    System.out.println("Decoded Header: \n" + header); 

    //Set the header attribute so it can be used by routing handlers 
    ctx.attr(ChannelAttributes.HEADER).getAndSet(header); 
    //pass to next handler 
    out.add(in); 
} 

注:我在讀Netty in Action MEAP v8

回答

2

這是否意味着解碼將被調用,直到readableBytes == 0?

基本上,是的。一個ByteToMessageDecoder的簡化圖看起來像這樣

while (in.isReadable()) { 
    int outputSizeBefore = out.size(); 
    int readableBytesBefore = in.readableBytes(); 

    callYourDecodeImpl(ctx, in, out); 

    int outputSizeAfter = out.size(); 
    int readableBytesAfter = in.readableBytes(); 

    boolean didNotDecodeAnything = outputSizeBefore == outputSizeAfter; 
    boolean didNotReadAnything = readableBytesBefore == readableBytesAfter; 

    if(didNotDecodeAnything && didNotReadAnything) { 
     break; 
    } 

    // next iteration, continue with decoding 
} 

所以,你的解碼器將繼續,直到輸入緩衝耗盡讀頭。

爲了得到你想要的行爲,您必須將isSingleDecode標誌設置爲true:

class MyDecoder extends ByteToMessageDecoder { 

    MyDecoder() { 
     setSingleDecode(true); 
    } 

    // your decode impl as before 
} 

MyDecoder decoder = new MyDecoder(); 
decoder.setSingleDecode(true); 

這將導致循環停止後您的解碼實現解碼東西。 現在您的LengthFieldBasedFrameDecoder將與您加入out列表中的ByteBuf一起調用。 幀解碼按照您所描述的方式工作,無需將副本添加到列表中。 你的SimpleChannelInboundHandler將被稱爲有效載荷幀作爲msg

但是,您將無法讀取來自AttributeMap的頭在你的SimpleChannelInboundHandler 因爲ChannelHandlerContext是不同的,每個通道一個處理器中,atrributes不共享。

解決此問題的一種方法是爲此使用事件。 在你decoder,而不是添加的HeaderAttributeMap,把它作爲一個事件:

// instead of 
// ctx.attr(Header.ATTRIBUTE_KEY).getAndSet(header); 
// do this 
ctx.fireUserEventTriggered(ChannelAttributes.HEADER); 

然後,寫你的SimpleChannelInboundHandler像這樣

class MyMessageHandler extends SimpleChannelInboundHandler<ByteBuf> { 

    private Header header = null; 

    MyMessageHandler() { 
      super(true); 
    } 

    @Override 
    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception { 
     if (evt instanceof Header) { 
      header = (Header) evt; 
     } else { 
      super.userEventTriggered(ctx, evt); 
     } 
    } 

    @Override 
    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception { 
     if (header != null) { 
      System.out.println("header = " + header); 
      // continue with header, such as routing... 
     } 
     header = null; 
    } 
} 

另一種方法是,以這兩個對象發下來管道和使用 ChannelInboundHandlerAdapter而不是SimpleChannelInboundHandler。 在你decoder,而不是添加的HeaderAttributeMap,將它添加到out

// ... 
out.add(header); 
out.add(in); 

然後,寫你的ChannelInboundHandler像這樣

class MyMessageHandler extends ChannelInboundHandlerAdapter { 
    private Header header = null; 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { 
     if (msg instanceof Header) { 
      header = (Header) msg; 
      System.out.println("got the header " + header); 
     } else if (msg instanceof ByteBuf) { 
      ByteBuf byteBuf = (ByteBuf) msg; 
      System.out.println("got the message " + msg); 
      try { 
       // continue with header, such as routing... 
      } finally { 
       ReferenceCountUtil.release(msg); 
      } 
     } else { 
      super.channelRead(ctx, msg); 
     } 
    } 
} 

LengthFieldBasedFrameDecoder簡單地忽略消息,是不是ByteBuf s, 所以您的標題只會通過它(因爲它不實施ByteBuf)和 到達您的ChannelInboundHandler。然後,該消息將被解碼爲 有效載荷幀並傳遞給您的ChannelInboundHandler

+1

謝謝! 'setSingleDecode(true)'正是我所需要的。但是我似乎無法在實現'ByteToMessageCodec'時找到方法,只有'ByteToMessageDecoder'。無論如何,將分解編碼和解碼過程。並感謝有關將'header'傳遞給下一個處理程序的提示。然而,我通過執行'ctx.channel()。attr(ChannelAttributes.HEADER).set(header)'來設置和檢索'header',並將它添加到頻道的'AttributeMap'中。這樣做有什麼缺點? – Ian2thedv 2014-10-29 07:33:05

+1

啊,是的,'setSingleDecode'只存在於'ByteToMessageDecoder'上。至於頻道上的屬性地圖,我沒有看到任何缺點,這當然是一個可行的解決方案。 – knutwalker 2014-10-29 08:56:32