2012-03-22 181 views
5

首先,讓我解釋一下上下文:在Netty客戶端發送多個異步請求

我必須創建一個客戶端,它會發送很多HTTP請求來下載圖像。這些請求必須是異步的,因爲只要圖像完成,它就會被添加到隊列中,然後打印到屏幕上。因爲圖像可能很大並且響應被分塊,所以我的處理程序必須將它聚合到一個緩衝區中。

所以我遵循Netty示例代碼(HTTP spoon example)。

目前,我有三個靜態地圖爲每個通道存儲通道ID和緩衝區/塊布爾/我的最終對象。

private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>(); 
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>(); 
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>(); 

之後,我創建了我的啓動客戶端和計數器countDown掛起的請求數。當響應圖像完成時,最終隊列和計數器被傳遞給我的處理程序。

final ClientBootstrap bootstrap = new ClientBootstrap(
      new NioClientSocketChannelFactory(
      Executors.newCachedThreadPool(), 
      Executors.newCachedThreadPool())); 
    bootstrap.setOption("keepAlive", true); 
    bootstrap.setOption("tcpNoDelay", true); 
    bootstrap.setOption("reuseAddress", true); 
    bootstrap.setOption("connectTimeoutMillis", 30000); 


    final CountDownLatch latch = new CountDownLatch(downloadList.size()) { 

     @Override 
     public void countDown() { 
      super.countDown(); 
      if (getCount() <= 0) { 
       try { 
        queue.put(END_OF_QUEUE); 
        bootstrap.releaseExternalResources(); 
       } catch (InterruptedException ex) { 
        LOGGER.log(Level.WARNING, ex.getMessage(), ex); 
       } 
      } 
     } 
    }; 
    bootstrap.getPipeline().addLast("codec", new HttpClientCodec()); 
    bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch)); 

之後,我爲每個要下載的圖像創建一個通道,當通道連接時,請求將被創建併發送。主機和端口已經被提取過。

for (final ImagePack pack : downloadList) { 

     final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); 

     future.addListener(new ChannelFutureListener() { 

      public void operationComplete(ChannelFuture cf) throws Exception { 

       final Channel channel = future.getChannel(); 

       PACK_MAP.put(channel.getId(), pack); 

       final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url); 
       request.setHeader(HttpHeaders.Names.HOST, host); 
       request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); 
       request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES); 

       if (channel.isWritable()) { 
        channel.write(request); 
       } 
      } 
     }); 
    } 

現在,這是我的ChannelHandler,它是一個擴展SimpleChannelUpstreamHandler的內部類。當通道連接時,會創建BUFFER_MAPCHUNKS_MAP中的新條目。 BUFFER_MAP包含處理程序用於聚合來自通道的圖像塊的所有圖像緩衝區,並且CHUNKS_MAP包含響應塊狀布爾值。響應完成後,圖像InputSteam被添加到隊列中,鎖存器倒計時並關閉通道。

private class TileClientHandler extends SimpleChannelUpstreamHandler { 

    private CancellableQueue<Object> queue; 
    private CountDownLatch latch; 

    public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) { 
     this.queue = queue; 
     this.latch = latch; 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ 
      BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); 
     } 
     if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ 
      CHUNKS_MAP.put(ctx.getChannel().getId(), false); 
     } 
    } 

    @Override 
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { 
     super.writeComplete(ctx, e); 
     if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ 
      BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); 
     } 
     if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ 
      CHUNKS_MAP.put(ctx.getChannel().getId(), false); 
     } 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     final Integer channelID = ctx.getChannel().getId(); 
     if (!CHUNKS_MAP.get(channelID)) { 
      final HttpResponse response = (HttpResponse) e.getMessage(); 

      if (response.isChunked()) { 
       CHUNKS_MAP.put(channelID, true); 

      } else { 
       final ChannelBuffer content = response.getContent(); 
       if (content.readable()) { 
        final ChannelBuffer buf = BUFFER_MAP.get(channelID); 
        buf.writeBytes(content); 
        BUFFER_MAP.put(channelID, buf); 
        messageCompleted(e); 

       } 
      } 
     } else { 
      final HttpChunk chunk = (HttpChunk) e.getMessage(); 
      if (chunk.isLast()) { 
       CHUNKS_MAP.put(channelID, false); 
       messageCompleted(e); 
      } else { 
       final ChannelBuffer buf = BUFFER_MAP.get(channelID); 
       buf.writeBytes(chunk.getContent()); 
       BUFFER_MAP.put(channelID, buf); 
      } 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     e.getCause().printStackTrace(); 
     latch.countDown(); 
     e.getChannel().close(); 
    } 

    private void messageCompleted(MessageEvent e) { 
     final Integer channelID = e.getChannel().getId(); 
     if (queue.isCancelled()) { 
      return; 
     } 

     try { 
      final ImagePack p = PACK_MAP.get(channelID); 
      final ChannelBuffer b = BUFFER_MAP.get(channelID); 

      p.setBuffer(new ByteArrayInputStream(b.array())); 
      queue.put(p.getTile()); 
     } catch (Exception ex) { 
      LOGGER.log(Level.WARNING, ex.getMessage(), ex); 
     } 
     latch.countDown(); 
     e.getChannel().close(); 
    } 
} 

我的問題是,當我執行這個代碼,我有這些異常:

java.lang.IllegalArgumentException: invalid version format: 3!}@ 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108) 
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) 
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

java.lang.IllegalArgumentException: invalid version format: 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108) 
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) 
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) 
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) 
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) 
    at org.jboss.netty.channel.Channels.close(Channels.java:720) 
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline 
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format: 
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread. 
    at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171) 
    at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324) 
    at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) 
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) 
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) 
    at org.jboss.netty.channel.Channels.close(Channels.java:720) 
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

,也有一些NPE顯得有些倍。

java.lang.NullPointerException 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 

所有這些代碼對於一個請求工作正常,但當許多請求在哪裏發送時,一些奇怪的東西附加在緩衝區上。

任何想法我在這裏失蹤?謝謝。

在我的第一個版本中,我爲每個請求的圖像重複引導程序/處理程序,它工作正常,但不是非常優化。

回答

5

問題是你在所有的頻道之間共享一個HttpClientCodec。在引導程序中指定的默認管道針對所有通道進行了克隆,因此每個通道都會看到每個處理程序的相同實例。 http編解碼器是有狀態的,所以您可以看到不同響應混合在一起的效果。

最簡單的解決方案是將ChannelPipelineFactory傳遞給引導程序。這將針對每個新通道調用,並且可以使用HttpClientCodec的新實例創建一個管道。沒有什麼可以阻止你爲每個你創建的管道使用相同的TileClientHandler實例,如果這是它的工作目的。

雖然我很好奇。鑑於您正在同時創建每個請求,只需在HttpClientCodec的上游添加HttpChunkAggregator並讓Netty將所有區塊聚合成單個HttpResponse,會不會更輕鬆。那麼你只需從那裏獲取重新組裝的內容?

+0

嗨johnstlr,感謝這個快速有用的答案,我現在使用ChannelPipelineFactory實例化HTTPCodec dand我的Tile處理程序。它工作正常,但我仍然得到了'java.lang.IllegalStateException:執行程序不能從自己獲取的線程關閉。請確保您未從I/O工作線程中調用releaseExternalResources()異常。你有想法嗎?對於這些信息,我沒有使用HttpChunkAggregator的原因是您必須將緩衝區大小設置爲HttpChunkAggregator構造函數。 – qboileau 2012-03-23 10:44:13

+1

您正在從CountDownLatch.countDown中調用bootstrap.releaseExternalResources,該函數是從您的處理程序方法中的IO線程調用的。不幸的是你不能這樣做。您需要從不在Netty使用的線程池中的線程調用releaseExternalResources。一種選擇可能是在完成處理隊列後從內部隊列中讀取的線程中調用releaseExternalResources。另外,你對HttpChunkAggregator完全正確。抱歉! – johnstlr 2012-03-23 18:09:56