热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty拆包粘包问题解决——特殊结束符

本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。

客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。

服务端

server

package com.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;//服务器端
public class MyServer {//监听线程组,监听客户端请求private EventLoopGroup acceptorGroup = null;//处理客户端相关操作线程组,负责处理与客户端的数据通信private EventLoopGroup clientGroup = null;//服务启动相关配置信息,服务端Bootstrap带serverprivate ServerBootstrap serverBootstrap = null;public MyServer(){init();}private void init(){//初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量acceptorGroup = new NioEventLoopGroup();clientGroup = new NioEventLoopGroup();//初始化服务的配置serverBootstrap = new ServerBootstrap();//绑定线程组,acceptorGroup监听信息,clientGroup客户端信息serverBootstrap.group(acceptorGroup,clientGroup);//设定通信模式为NIO,同步非阻塞serverBootstrap.channel(NioServerSocketChannel.class);//设定缓冲区大小,缓冲区单位是字节serverBootstrap.option(ChannelOption.SO_BACKLOG,1024);//SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)serverBootstrap.option(ChannelOption.SO_SNDBUF,16*1024).option(ChannelOption.SO_RCVBUF,16*1024).option(ChannelOption.SO_KEEPALIVE,true);}public ChannelFuture doAccept(int port) throws InterruptedException {/*** childHandler是服务的bootstrap独有的方法,用于提供处理对象* 可以一次性增加若干个处理逻辑,是类似责任链模式的处理方式* 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A->B顺序依次处理*/serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符,定义的数据分隔符一定是一个ByteBuf类型的数据对象ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] acceptorHandlers = new ChannelHandler[3];//处理固定结束标记符号的Handler,这个Handler没有@Sharabler注解修饰//必须每次初始化通道时创建一个新的对象//使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度,Netty建议数据有最大长度acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换成字符串acceptorHandlers[1] = new StringDecoder(Charset.forName("utf-8"));acceptorHandlers[2] = new MyServerHandler();socketChannel.pipeline().addLast(acceptorHandlers);}});/*** bind方法 - 绑定监听端口,serverBootstrap可以绑定多个监听端口,多次调用即可* sync - 开始监听逻辑,返回一个ChannelFuture,返回结果代表的是监听成功后的一个对应的未来结果* 可以使用ChannelFuture实现后续的服务器和客户端的交互*/ChannelFuture future = serverBootstrap.bind(port).sync();/*绑定多个端口serverBootstrap.bind(port);serverBootstrap.bind(port);*/return future;}/*** shutdownGracefully - 是一个安全关闭的方法,可以保证不放弃任何一个已接收的客户端请求*/public void release(){this.acceptorGroup.shutdownGracefully();this.clientGroup.shutdownGracefully();}public static void main(String[] args){ChannelFuture future = null;MyServer myServer = null;try {myServer = new MyServer();future = myServer.doAccept(9999);System.out.println("server started");//关闭连接future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != myServer){myServer.release();}}}
}

serverHandler

package com.server;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;import java.io.UnsupportedEncodingException;/*** @Sharable代表当前Handler是一个可以分享的处理器,可以分享给多个客户端同时使用* 如不使用注解类型,每次客户请求时,必须为客户重新创建一个新的Handler对象*/
@Sharable
public class MyServerHandler extends ChannelHandlerAdapter{/*** 业务处理逻辑* 用于处理读取数据请求的逻辑* ctx - 上下文对象,其中包含于客户端建立连接的所有资源,如:对应的Channel* msg - 读取到的数据,默认类型是ByteBuf,是Netty自定义的,是对ByteBuffer的封装,不用考虑复位问题*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {String message = msg.toString();System.out.println("from client :"+message);String line = "server message $E$ test delimiter handler!! $E$ second message $E$";if ("exit".equals(message)){ctx.close();return;}//写操作自动释放缓存,避免内存溢出ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));/*如果调用的是write方法,不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行ctx.write(Unpooled.copiedBuffer(line.getBytes("utf-8")));ctx.close();*/}/*** 异常处理逻辑,当客户端异常退出时也会执行* ChannelHandlerContext关闭,也代表当前与客户端连接资源关闭*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){System.out.println("server exceptionCaught method run..");ctx.close();}
}

客户端

client

package com.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.Timer;
import java.util.concurrent.TimeUnit;/*** 客户端是请求的发起者,不需要监听* 只需要定义唯一的一个线程组即可*/
public class CustorClient {//处理请求和处理服务端响应的线程组private EventLoopGroup group = null;//客户端服务启动相关配置信息private Bootstrap bootstrap = null;public CustorClient(){init();}private void init(){group = new NioEventLoopGroup();bootstrap = new Bootstrap();//绑定线程组bootstrap.group(group);//设定通讯模式为NIObootstrap.channel(NioSocketChannel.class);}public ChannelFuture doRequest(String host, int port) throws InterruptedException {/*** 客户端的bootstrap没有childHandler方法,只有handler方法* 方法含义等同于ServerBootstrap中的childHandler* 在客户端必须绑定处理器(必须调用handler方法)* 服务器必须绑定处理器(必须调用childHandler方法)*/this.bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] handlers = new ChannelHandler[3];handlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器handlerhandlers[1] = new StringDecoder(Charset.forName("utf-8"));handlers[2] = new CustorClientHandler();socketChannel.pipeline().addLast(handlers);}});//建立连接ChannelFuture future = this.bootstrap.connect(host,port).sync();return future;}public void release(){this.group.shutdownGracefully();}public static void main(String[] atgs){CustorClient client = null;ChannelFuture future = null;try {client = new CustorClient();future = client.doRequest("localhost", 9999);Scanner s = null;while (true) {s = new Scanner(System.in);System.out.println("enter message send to server(enter 'exit' for close client)");String line = s.nextLine();if ("exit".equals(line)) {/*** addListener - 增加监听,当条件满足时候,出发监听器* ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接*/future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8"))).addListener(ChannelFutureListener.CLOSE);break;}//Unpooled工具类用来做buffer转换future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));//睡一秒读取信息TimeUnit.SECONDS.sleep(1);}}catch (Exception e){e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != client){client.release();}}}
}

clientHandler

package com.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;import java.io.UnsupportedEncodingException;public class CustorClientHandler extends ChannelHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext cxt, Object msg) throws UnsupportedEncodingException {try {String message = msg.toString();System.out.println("form server:"+message);} finally {//释放资源,避免内存溢出ReferenceCountUtil.release(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext cxt,Throwable cause){System.out.println("client exceptionCaught method run..");cxt.close();}
}

测试

 


推荐阅读
  • 流数据流和IO流的使用及应用
    本文介绍了流数据流和IO流的基本概念和用法,包括输入流、输出流、字节流、字符流、缓冲区等。同时还介绍了异常处理和常用的流类,如FileReader、FileWriter、FileInputStream、FileOutputStream、OutputStreamWriter、InputStreamReader、BufferedReader、BufferedWriter等。此外,还介绍了系统流和标准流的使用。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 该楼层疑似违规已被系统折叠隐藏此楼查看此楼*madebyebhrz*#include#include#include#include#include#include#include ... [详细]
  • 初识java关于JDK、JRE、JVM 了解一下 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • Java SE从入门到放弃(三)的逻辑运算符详解
    本文详细介绍了Java SE中的逻辑运算符,包括逻辑运算符的操作和运算结果,以及与运算符的不同之处。通过代码演示,展示了逻辑运算符的使用方法和注意事项。文章以Java SE从入门到放弃(三)为背景,对逻辑运算符进行了深入的解析。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • 本文介绍了如何使用go语言实现一个一对一的聊天服务器和客户端,包括服务器开启、等待客户端连接、关闭连接等操作。同时提供了一个相关的多人聊天的链接供参考。 ... [详细]
  • 本文介绍了使用C++Builder实现获取USB优盘序列号的方法,包括相关的代码和说明。通过该方法,可以获取指定盘符的USB优盘序列号,并将其存放在缓冲中。该方法可以在Windows系统中有效地获取USB优盘序列号,并且适用于C++Builder开发环境。 ... [详细]
  • Ihavebeenworkingwithbufferingafileonmylocaldrivetoparseandobtaincertaindata.Forte ... [详细]
  • 图片添加二维码水印教程
    本博客介绍一下用jdkawt实现图片加文字水印和图片水印的方法一、图片文字水印原来图片加上文字水印后图片二、图片加图片水印原来图片:水印图片:添加水印后的图片: ... [详细]
author-avatar
琳琳小朋友m
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有