public class TimeServer { public void bind(int port) { try { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel arg0) throws Exception { System.out.println("初始化"); arg0.pipeline().addLast(new TimeHandler()); } }); ChannelFuture future = b.bind(port).sync(); System.out.println("执行这里"); future.channel().closeFuture().sync(); System.out.println("执行这里"); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new TimeServer().bind(10000); } } public class TimeHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); ByteBuf buf=(ByteBuf) msg; //// byte[] butfs = buf.array();//报错 System.out.println(buf.readableBytes()); byte[] butfs = new byte[buf.readableBytes()]; buf.readBytes(butfs); System.out.println(new String(butfs,"UTF-8")); System.out.println(msg); } }
客户端使用的是BIO的模型:
public static void main(String[] args) throws Exception { final int port = 10000; // NioServer server = new NioServer(port); // server.init(); /// ======================================================== // 接下来模拟3个Client并发访问服务器 int poolsize = 1; ExecutorService pool = Executors.newFixedThreadPool(poolsize); Collectiontasks = new ArrayList (10); final String clientname = "clientThread"; for (int i = 0; i < poolsize; i++) { final int n = i; // 若每一个Client都保持使用BIO方式发送数据到Server,并读取数据。 tasks.add(new Callable() { @Override public Object call() throws Exception { Socket socket = new Socket("127.0.0.1", port); final InputStream input = socket.getInputStream(); final OutputStream out = socket.getOutputStream(); final String clientname_n = clientname + "_" + n; // BIO读取数据线程 new Thread(clientname_n + "_read") { @Override public void run() { byte[] bs = new byte[1024]; while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } int len = 0; try { while ((len = input.read(bs)) != -1) { System.out.println("Clinet thread " + Thread.currentThread().getName() + " read: " + new String(bs, 0, len)); } } catch (IOException e) { e.printStackTrace(); } } } }.start(); // BIO写数据线程 new Thread(clientname_n + "_write") { @Override public void run() { int a = 0; while (true) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } String str = Thread.currentThread().getName() + " hello, " + a; try { out.write(str.getBytes()); out.flush(); a++; } catch (IOException e) { e.printStackTrace(); } } } }.start(); return null; } }); } pool.invokeAll((Collection extends Callable
结果运行的时候出现了以下错误:
月 13, 2017 5:52:56 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException 警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1408) at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1394) at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1383) at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850) at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858) at test.netty.TimeHandler.channelRead(TimeHandler.java:17) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)
请问这是为什么呢?
An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
缺少exceptionCaught(),在在server端最后一个Handler中增加exceptionCaught()
在netty4中,对象的生命周期由引用计数器控制,ByteBuf就是如此,每个对象的初始化引用计数为1,调用一次release方法,引用计数器会减1,当尝试访问计数器为0的,对象时,会抛出IllegalReferenceCountException,正如ensureAccessible的实现,更加详细的解释可以参考官方文档
AbstractByteBuf.java
protected final void ensureAccessible() { if (refCnt() == 0) { throw new IllegalReferenceCountException(0); } }
注意TZ的TimeHandler类中的 super.channelRead(ctx, msg);这行代码。追踪调用路径,
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } }
最终调用的代码是:ReferenceCountUtil.release(msg)
public static boolean release(Object msg) { if (msg instanceof ReferenceCounted) { return ((ReferenceCounted) msg).release(); } return false; }
也就是每次super.channelRead(ctx, msg);后,ByteBuf就会调用release()方法,计数器减一,然后在 buf.readBytes(butfs);这行代码就会校验ensureAccessible(),计数器为0,netty认为ByteBuf对象已经释放,就抛出异常。
去掉TimeHandler中这行代码 super.channelRead(ctx, msg);
ByteBuf对象谁处理谁释放。