java - 使用Netty Demo报错

 桥之西海_744 发布于 2022-10-25 17:42
public class TimeServer {
    public void bind(int port) {
        try {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(SocketChannel arg0) throws Exception {
                    System.out.println("初始化");
                    arg0.pipeline().addLast(new TimeHandler());
                }
            });
            ChannelFuture future = b.bind(port).sync();
            System.out.println("执行这里");
            future.channel().closeFuture().sync();
            System.out.println("执行这里");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
      public static void main(String[] args) {
        new  TimeServer().bind(10000);
    }
}
public class TimeHandler extends ChannelInboundHandlerAdapter {
    

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
    ByteBuf buf=(ByteBuf) msg;
////        byte[] butfs = buf.array();//报错
    System.out.println(buf.readableBytes());
        byte[] butfs = new byte[buf.readableBytes()];
        buf.readBytes(butfs);
        System.out.println(new String(butfs,"UTF-8"));
        System.out.println(msg);
    }


}

客户端使用的是BIO的模型:

public static void main(String[] args) throws Exception {
        final int port = 10000;
//        NioServer server = new NioServer(port);
//        server.init();
        /// ========================================================
        // 接下来模拟3个Client并发访问服务器
        int poolsize = 1;
        ExecutorService pool = Executors.newFixedThreadPool(poolsize);
        Collection tasks = new ArrayList(10);
        final String clientname = "clientThread";
        for (int i = 0; i < poolsize; i++) {
            final int n = i;
            // 若每一个Client都保持使用BIO方式发送数据到Server,并读取数据。
            tasks.add(new Callable() {
                @Override
                public Object call() throws Exception {
                    Socket socket = new Socket("127.0.0.1", port);
                    final InputStream input = socket.getInputStream();
                    final OutputStream out = socket.getOutputStream();
                    final String clientname_n = clientname + "_" + n;
                    // BIO读取数据线程
                    new Thread(clientname_n + "_read") {
                        @Override
                        public void run() {
                            byte[] bs = new byte[1024];
                            while (true) {
                                try {

                                    Thread.sleep(1000);

                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                int len = 0;
                                try {
                                    while ((len = input.read(bs)) != -1) {

                                        System.out.println("Clinet thread " + Thread.currentThread().getName()
                                                + " read: " + new String(bs, 0, len));
                                    }

                                } catch (IOException e) {

                                    e.printStackTrace();
                                }
                            }
                        }

                    }.start();
                    // BIO写数据线程
                    new Thread(clientname_n + "_write") {
                        @Override
                        public void run() {
                            int a = 0;
                            while (true) {
                                try {
                                    Thread.sleep(100);

                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }

                                String str = Thread.currentThread().getName() + " hello, " + a;
                                try {
                                    out.write(str.getBytes());
                                    out.flush();
                                    a++;

                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }

                    }.start();

                    return null;

                }

            });

        }
        pool.invokeAll((Collection>) tasks);
//        server.go();

    }

结果运行的时候出现了以下错误:

月 13, 2017 5:52:56 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1408)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1394)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1383)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858)
    at test.netty.TimeHandler.channelRead(TimeHandler.java:17)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

请问这是为什么呢?

2 个回答
  • An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
    缺少exceptionCaught(),在在server端最后一个Handler中增加exceptionCaught()

    2022-10-26 23:42 回答
  • 在netty4中,对象的生命周期由引用计数器控制,ByteBuf就是如此,每个对象的初始化引用计数为1,调用一次release方法,引用计数器会减1,当尝试访问计数器为0的,对象时,会抛出IllegalReferenceCountException,正如ensureAccessible的实现,更加详细的解释可以参考官方文档

    AbstractByteBuf.java

    protected final void ensureAccessible() {
            if (refCnt() == 0) {
                throw new IllegalReferenceCountException(0);
            }
        }

    注意TZ的TimeHandler类中的 super.channelRead(ctx, msg);这行代码。追踪调用路径,

    private void invokeChannelRead(Object msg) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        }

    最终调用的代码是:ReferenceCountUtil.release(msg)

    public static boolean release(Object msg) {
            if (msg instanceof ReferenceCounted) {
                return ((ReferenceCounted) msg).release();
            }
            return false;
        }

    也就是每次super.channelRead(ctx, msg);后,ByteBuf就会调用release()方法,计数器减一,然后在 buf.readBytes(butfs);这行代码就会校验ensureAccessible(),计数器为0,netty认为ByteBuf对象已经释放,就抛出异常。

    解决方案:

    去掉TimeHandler中这行代码 super.channelRead(ctx, msg);
    ByteBuf对象谁处理谁释放。

    2022-10-26 23:42 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有