热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty协议应用和设计

一、协议的重要性客户端和服务器端在传递消息的过程中,必然需要约定消息,否则双方是没有办法理解相互之间传递的信息。redis的协议redis协议遵循的规则是位数-命令,位数-命令,位

一、协议的重要性

客户端和服务器端在传递消息的过程中,必然需要约定消息,否则双方是没有办法理解相互之间传递的信息。

redis的协议

redis协议遵循的规则是 位数-命令,位数-命令,位数-命令....

例子:

package com.test.netty.c7; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @Slf4j public class RedisClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); //按照redis的协议 final byte[] LINE = {'\r', '\n'}; try { ChannelFuture cOnnect= new Bootstrap().group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf =ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$4".getBytes()); buf.writeBytes(LINE); buf.writeBytes("name".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$5".getBytes()); buf.writeBytes(LINE); buf.writeBytes("bbbbb".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } }); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug(((ByteBuf) msg).toString(Charset.defaultCharset())); } }).connect(new InetSocketAddress("localhost", 6379)); connect.sync(); connect.channel().close().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } }

写完之后,在redis端查询"name":

HTTP协议

HTTP协议相对比较复杂,Netty已经共了服务器端编码和解码的工具类HttpServerCodec来处理HTTP请求。

// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder // Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器 public final class HttpServerCodec extends CombinedChannelDuplexHandler implements HttpServerUpgradeHandler.SourceCodec

服务器端代码:

package com.test.netty.c7; import com.test.nio.c3.block.Server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; @Slf4j public class HttpServer { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); new ServerBootstrap() .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { log.debug("请求地址:{}",msg.uri()); //设置响应内容,版本、状态码 DefaultFullHttpResponse respOnse= new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); //设置响应内容 //设置长度,否则浏览器一直在读取中 byte[] bytes = "

Hello,MG!

".getBytes(StandardCharsets.UTF_8); response.headers().setInt(CONTENT_LENGTH, bytes.length); //设置响应体 response.content().writeBytes(bytes); //写会响应 ctx.writeAndFlush(response); } }); } }).bind(8080); } }

浏览器访问:

二、自定义协议

自定义协议的要素

  • 魔术:协议双方约定的开头,这样双方能够第一时间知道是否可以解析传递过来的消息
  • 版本号:协议的版本
  • 序列化算法:用什么方式类序列化传递的信息
  • 指令类型:跟业务相关的操作
  • 请求序号:为了双工通信,提供异步的能力
  • 正文长度:正文的长度
  • 消息正文:消息体

实例:

package com.test.netty.c8; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import lombok.extern.slf4j.Slf4j; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.List; @Slf4j public class MessageCodec extends ByteToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { //设置魔数 4个字节 out.writeBytes(new byte[]{'m', 'g', '2' ,'1'}); //设置版本号 out.writeByte(1); //设置序列化方式 out.writeByte(1); //设置指令类型 - 业务 out.writeByte(msg.getMessageType()); //设置请求序号 out.writeInt(msg.getSequenceId()); //补齐 out.writeByte(0xff); //序列化 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); objectOutputStream.writeObject(msg); byte[] bytes = byteArrayOutputStream.toByteArray(); //获得并设置政委长度 out.writeInt(bytes.length); out.writeBytes(bytes); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { //魔数 int magicNum = in.readInt(); //版本号 byte version = in.readByte(); //序列化方式 byte seqType = in.readByte(); //指令类型 byte messageType = in.readByte(); //获取请求编号 int sequenceId = in.readInt(); //补齐 in.readByte(); //获取正文长度 int length = in.readInt(); byte[] bytes = new byte[length]; in.readBytes(bytes, 0, length); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois.readObject(); out.add(message); log.debug("魔数 {}", magicNum); log.debug("版本 {}", version); log.debug("序列化方式 {}", seqType); log.debug("消息类型 {}", messageType); log.debug("请求序列 {}", sequenceId); log.debug("长度 {}", length); log.debug("消息 {}", message); } }

  • 自定义编码解码handler,实现了 ByteToMessage 接口
  • 在对应的编码、解码方法里面,按照之前介绍的内容进行按顺序进行编写即可

测试类:

package com.test.netty.c8; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class TestCodec { public static void main(String[] args) throws Exception{ EmbeddedChannel channel = new EmbeddedChannel( new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0), new LoggingHandler(LogLevel.DEBUG), new MessageCodec() ); LoginRequestMessage loginRequestMessage = new LoginRequestMessage("liming", "2292123a"); ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, loginRequestMessage, byteBuf); channel.writeInbound(byteBuf); } }

运行结果:

 三、Sharable 注解

为了提高handler的使用率,netty使用了注解@Sharable注解对handler进行标识,其实就是一个handler是否是线程安全的(也就是无状态的),如果是线程安全的就可以使用@Sharable进行标识,表明这个handler可以共用,否则每次都需要new 一个新的handler。

之前写的 MessageCodec handler理论上市线程安全的,但是它的父类是线程不安全的,所以使用

@ChannelHandler.Sharable public class MessageSharableCodec extends MessageToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Message msg, List out) throws Exception { ... } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { ... } }

MessageToMessageCodec 这个父类即可。


推荐阅读
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • 在Kubernetes上部署JupyterHub的步骤和实验依赖
    本文介绍了在Kubernetes上部署JupyterHub的步骤和实验所需的依赖,包括安装Docker和K8s,使用kubeadm进行安装,以及更新下载的镜像等。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • 多维数组的使用
    本文介绍了多维数组的概念和使用方法,以及二维数组的特点和操作方式。同时还介绍了如何获取数组的长度。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 数组的排序:数组本身有Arrays类中的sort()方法,这里写几种常见的排序方法。(1)冒泡排序法publicstaticvoidmain(String[]args ... [详细]
  • 面向对象之3:封装的总结及实现方法
    本文总结了面向对象中封装的概念和好处,以及在Java中如何实现封装。封装是将过程和数据用一个外壳隐藏起来,只能通过提供的接口进行访问。适当的封装可以提高程序的理解性和维护性,增强程序的安全性。在Java中,封装可以通过将属性私有化并使用权限修饰符来实现,同时可以通过方法来访问属性并加入限制条件。 ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • Java SE从入门到放弃(三)的逻辑运算符详解
    本文详细介绍了Java SE中的逻辑运算符,包括逻辑运算符的操作和运算结果,以及与运算符的不同之处。通过代码演示,展示了逻辑运算符的使用方法和注意事项。文章以Java SE从入门到放弃(三)为背景,对逻辑运算符进行了深入的解析。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文详细介绍了使用C#实现Word模版打印的方案。包括添加COM引用、新建Word操作类、开启Word进程、加载模版文件等步骤。通过该方案可以实现C#对Word文档的打印功能。 ... [详细]
author-avatar
gogo迷失的大G
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有