2019独角兽企业重金招聘Python工程师标准>>>
在Netty中解决拆包和粘包的问题,我们只需要将解码器添加到ChannelPipeline中就可以了。
LineBasedFrameDecoder的工作原理就是它依次遍历ByteBuf中的可读字节,如果有\n和\r\n,就以此为结束位置,它是以换行符为结束标志的解码器。如果读取到行的最大长度还没有发现换行,就会抛出异常,同时忽略掉之前读到的异常码流。
StringDecoder就是讲收到的对象转换成字符串。
上面这两种解码器结合起来其实就是换行切换的文本解码器。
后续还会涉及到分隔符的解码器和定长解码器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/*** @FileName TimeServer.java* @Description:** @Date 2016年3月2日* @author Administroter* @version 1.0* */
public class TimeServer {public void bind(int port) throws Exception {// 配置服务端的NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {/*** 创建ServerBootstrap,它是Netty用于启动NIO服务端的辅助启动类*/ServerBootstrap b = new ServerBootstrap();/*** 创建管道NioServerSocketChannel,也就是NIO中的ServerSocketChannel,然后设置TCP的参数* ,设置为1024,最后* 创建ChildChannelHandler,也就是Reactor模式中的handler类,用于处理网络IO时间* ,比如对消息的编解码。*/b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());/*** 绑定端口,同步等待成功* 绑定完成之后会返回一个ChannelFuture,这里类似于JDK的java.util.concurrent.Future。* 用于异步操作的通知回调*/ChannelFuture f = b.bind(port).sync();// 等待服务端监听端口关闭f.channel().closeFuture().sync();} finally {// 优雅退出,释放线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/*** @FileName TimeServerHandler.java* @Description:用于对网络事件读写操作** @Date 2016年3月2日* @author Administroter* @version 1.0* */
public class TimeServerHandler extends ChannelHandlerAdapter {private int counter;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//添加解码器后不需要考虑处理读半包的问题,也不需要对客户端请求的消息msg进行编码String body = (String) msg;System.out.println("The time server receive order : " + body +";记录数 -----: " + ++counter);String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString(): "BAD ORDER";currentTime = currentTime + System.getProperty("line.separator");ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());ctx.writeAndFlush(resp);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/*** @FileName TimeClient.java* @Description: ** @Date 2016年3月2日 * @author Administroter* @version 1.0* */
public class TimeClient {public void connect(int port, String host) throws Exception {// 配置客户端NIO线程组EventLoopGroup group = new NioEventLoopGroup();try {//客户端辅助启动类Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer
}
import java.util.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/*** @FileName TimeClientHandler.java* @Description:** @Date 2016年3月2日* @author Administroter* @version 1.0* */
public class TimeClientHandler extends ChannelHandlerAdapter {private static final Logger logger &#61; Logger.getLogger(TimeClientHandler.class.getName());private int counter;private byte[] request;/*** Creates a client-side handler.*/public TimeClientHandler() {request &#61; ("QUERY TIME ORDER"&#43;System.getProperty("line.separator")).getBytes();}/*** TCP链路建立成功后&#xff0c;调用这个方法发送查询指令给服务器*/&#64;Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf message &#61; null;for (int i &#61; 0; i < 100; i&#43;&#43;) {message &#61; Unpooled.buffer(request.length);message.writeBytes(request);ctx.writeAndFlush(message);}}/*** 服务器你响应结果后调用这个方法&#xff0c;获取服务器响应的结果*/&#64;Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String body &#61; (String)msg;System.out.println("服务器器响应结果 : " &#43; body &#43; "; the counter is : " &#43; &#43;&#43;counter);}/*** 链路建立失败&#xff0c;释放资源*/&#64;Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 释放资源logger.warning("Unexpected exception from downstream : " &#43; cause.getMessage());ctx.close();}
}