2016-07-30 108 views
0

我是netty.io的新手。我試圖編寫一個應用程序,在客戶端第一次連接時插入超過200行。問題是一次只插入10行,看起來像netty只是打斷了我的代碼。我無法弄清楚如何爲我的任務設置最大執行時間。如何在netty中執行長任務?

這是怎麼了,我開始服務器:

public void start() throws InterruptedException{ 
     EventLoopGroup group = new NioEventLoopGroup(); 
     EventExecutorGroup sqlExecutorGroup = new DefaultEventExecutorGroup(10); 
     ServerBootstrap server = new ServerBootstrap(); 
     server.group(group); 
     server.channel(NioServerSocketChannel.class); 
     server.localAddress(new InetSocketAddress(port)); 
     server.childHandler(new ChannelInitializer<SocketChannel>(){ 
      @Override 
      protected void initChannel(SocketChannel ch) throws Exception { 
        ch.pipeline().addLast(new HttpServerCodec()); 
        ch.pipeline().addLast(new ChunkedWriteHandler()); 
        ch.pipeline().addLast(new HttpObjectAggregator(65536)); 
        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/")); 
        ch.pipeline().addLast(new WebSocketFrameToByteBuf()); 
        ch.pipeline().addLast(new ProtobufEncoder()); 
        ch.pipeline().addLast(new ProtobufDecoder(Chat.MsgWrapper.getDefaultInstance())); 
        ch.pipeline().addLast(new MsgUserLogin(prop.getProperty("sessions_prefix"))); 
      } 
     }); 
     ChannelFuture f = server.bind().sync(); 
     f.channel().closeFuture().sync(); 
    } 

這是IM執行任務的處理程序中的功能:

private void register(ChannelHandlerContext ctx, Chat.User u){ 
     ctx.executor().execute(new Runnable() { 
      @Override 
      public void run() { 
       ctx.channel().attr(Main.KeyUser).set(u); 
       ConversationUserMember member = ConversationPool.getOrCacheConversationUserMember(u); 
       ConversationPool.UpdateConversations(member); 
       Main.loggedUsers.add(ctx.channel()); 
       member.newChannel(ctx.channel()); 
       returnConversations(member,ctx); //this function is the one getting cut by netty. 
      // fireNewUser(u); 
      } 
     }); 
    } 

回答

0

多一點研究,我發現所有的後我正在做的錯誤:

  1. 我沒有添加EventExecutorGroup到我的處理程序。
  2. 執行插入的代碼沒有正確釋放數據庫連接,這是主要問題。
  3. 在我使用的處理程序ctx.executor()。執行執行我的代碼後,將EventExecutorGroup添加到處理程序,這不是必需的。

工作的代碼如下所示:

public void start() throws InterruptedException{ 
     EventLoopGroup group = new NioEventLoopGroup(); 
     EventExecutorGroup sqlExecutorGroup = new DefaultEventExecutorGroup(10); 
     ServerBootstrap server = new ServerBootstrap(); 
     server.group(group); 
     server.channel(NioServerSocketChannel.class); 
     server.localAddress(new InetSocketAddress(port)); 
     server.childHandler(new ChannelInitializer<SocketChannel>(){ 
      @Override 
      protected void initChannel(SocketChannel ch) throws Exception { 
        ch.pipeline().addLast(new HttpServerCodec()); 
        ch.pipeline().addLast(new ChunkedWriteHandler()); 
        ch.pipeline().addLast(new HttpObjectAggregator(65536)); 
        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/")); 
        ch.pipeline().addLast(new WebSocketFrameToByteBuf()); 
        ch.pipeline().addLast(new ProtobufEncoder()); 
        ch.pipeline().addLast(new ProtobufDecoder(Chat.MsgWrapper.getDefaultInstance())); 
        ch.pipeline().addLast(sqlExecutorGroup,new MsgUserLogin(prop.getProperty("sessions_prefix"))); 
      } 
     }); 
     ChannelFuture f = server.bind().sync(); 
     f.channel().closeFuture().sync(); 
    } 

和:

private void register(ChannelHandlerContext ctx, Chat.User u){ 
       ctx.channel().attr(Main.KeyUser).set(u); 
       ConversationUserMember member =       ConversationPool.getOrCacheConversationUserMember(u); 
       ConversationPool.UpdateConversations(member); 
       Main.loggedUsers.add(ctx.channel()); 
       member.newChannel(ctx.channel()); 
       returnConversations(member,ctx); //this function is the one getting cut by netty. 
      // fireNewUser(u); 
    }