热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty拆包粘包问题解决——特殊结束符

本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。

客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。

服务端

server

package com.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;//服务器端
public class MyServer {//监听线程组,监听客户端请求private EventLoopGroup acceptorGroup = null;//处理客户端相关操作线程组,负责处理与客户端的数据通信private EventLoopGroup clientGroup = null;//服务启动相关配置信息,服务端Bootstrap带serverprivate ServerBootstrap serverBootstrap = null;public MyServer(){init();}private void init(){//初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量acceptorGroup = new NioEventLoopGroup();clientGroup = new NioEventLoopGroup();//初始化服务的配置serverBootstrap = new ServerBootstrap();//绑定线程组,acceptorGroup监听信息,clientGroup客户端信息serverBootstrap.group(acceptorGroup,clientGroup);//设定通信模式为NIO,同步非阻塞serverBootstrap.channel(NioServerSocketChannel.class);//设定缓冲区大小,缓冲区单位是字节serverBootstrap.option(ChannelOption.SO_BACKLOG,1024);//SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)serverBootstrap.option(ChannelOption.SO_SNDBUF,16*1024).option(ChannelOption.SO_RCVBUF,16*1024).option(ChannelOption.SO_KEEPALIVE,true);}public ChannelFuture doAccept(int port) throws InterruptedException {/*** childHandler是服务的bootstrap独有的方法,用于提供处理对象* 可以一次性增加若干个处理逻辑,是类似责任链模式的处理方式* 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A->B顺序依次处理*/serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符,定义的数据分隔符一定是一个ByteBuf类型的数据对象ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] acceptorHandlers = new ChannelHandler[3];//处理固定结束标记符号的Handler,这个Handler没有@Sharabler注解修饰//必须每次初始化通道时创建一个新的对象//使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度,Netty建议数据有最大长度acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换成字符串acceptorHandlers[1] = new StringDecoder(Charset.forName("utf-8"));acceptorHandlers[2] = new MyServerHandler();socketChannel.pipeline().addLast(acceptorHandlers);}});/*** bind方法 - 绑定监听端口,serverBootstrap可以绑定多个监听端口,多次调用即可* sync - 开始监听逻辑,返回一个ChannelFuture,返回结果代表的是监听成功后的一个对应的未来结果* 可以使用ChannelFuture实现后续的服务器和客户端的交互*/ChannelFuture future = serverBootstrap.bind(port).sync();/*绑定多个端口serverBootstrap.bind(port);serverBootstrap.bind(port);*/return future;}/*** shutdownGracefully - 是一个安全关闭的方法,可以保证不放弃任何一个已接收的客户端请求*/public void release(){this.acceptorGroup.shutdownGracefully();this.clientGroup.shutdownGracefully();}public static void main(String[] args){ChannelFuture future = null;MyServer myServer = null;try {myServer = new MyServer();future = myServer.doAccept(9999);System.out.println("server started");//关闭连接future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != myServer){myServer.release();}}}
}

serverHandler

package com.server;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;import java.io.UnsupportedEncodingException;/*** @Sharable代表当前Handler是一个可以分享的处理器,可以分享给多个客户端同时使用* 如不使用注解类型,每次客户请求时,必须为客户重新创建一个新的Handler对象*/
@Sharable
public class MyServerHandler extends ChannelHandlerAdapter{/*** 业务处理逻辑* 用于处理读取数据请求的逻辑* ctx - 上下文对象,其中包含于客户端建立连接的所有资源,如:对应的Channel* msg - 读取到的数据,默认类型是ByteBuf,是Netty自定义的,是对ByteBuffer的封装,不用考虑复位问题*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {String message = msg.toString();System.out.println("from client :"+message);String line = "server message $E$ test delimiter handler!! $E$ second message $E$";if ("exit".equals(message)){ctx.close();return;}//写操作自动释放缓存,避免内存溢出ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));/*如果调用的是write方法,不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行ctx.write(Unpooled.copiedBuffer(line.getBytes("utf-8")));ctx.close();*/}/*** 异常处理逻辑,当客户端异常退出时也会执行* ChannelHandlerContext关闭,也代表当前与客户端连接资源关闭*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){System.out.println("server exceptionCaught method run..");ctx.close();}
}

客户端

client

package com.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.Timer;
import java.util.concurrent.TimeUnit;/*** 客户端是请求的发起者,不需要监听* 只需要定义唯一的一个线程组即可*/
public class CustorClient {//处理请求和处理服务端响应的线程组private EventLoopGroup group = null;//客户端服务启动相关配置信息private Bootstrap bootstrap = null;public CustorClient(){init();}private void init(){group = new NioEventLoopGroup();bootstrap = new Bootstrap();//绑定线程组bootstrap.group(group);//设定通讯模式为NIObootstrap.channel(NioSocketChannel.class);}public ChannelFuture doRequest(String host, int port) throws InterruptedException {/*** 客户端的bootstrap没有childHandler方法,只有handler方法* 方法含义等同于ServerBootstrap中的childHandler* 在客户端必须绑定处理器(必须调用handler方法)* 服务器必须绑定处理器(必须调用childHandler方法)*/this.bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] handlers = new ChannelHandler[3];handlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器handlerhandlers[1] = new StringDecoder(Charset.forName("utf-8"));handlers[2] = new CustorClientHandler();socketChannel.pipeline().addLast(handlers);}});//建立连接ChannelFuture future = this.bootstrap.connect(host,port).sync();return future;}public void release(){this.group.shutdownGracefully();}public static void main(String[] atgs){CustorClient client = null;ChannelFuture future = null;try {client = new CustorClient();future = client.doRequest("localhost", 9999);Scanner s = null;while (true) {s = new Scanner(System.in);System.out.println("enter message send to server(enter 'exit' for close client)");String line = s.nextLine();if ("exit".equals(line)) {/*** addListener - 增加监听,当条件满足时候,出发监听器* ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接*/future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8"))).addListener(ChannelFutureListener.CLOSE);break;}//Unpooled工具类用来做buffer转换future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));//睡一秒读取信息TimeUnit.SECONDS.sleep(1);}}catch (Exception e){e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != client){client.release();}}}
}

clientHandler

package com.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;import java.io.UnsupportedEncodingException;public class CustorClientHandler extends ChannelHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext cxt, Object msg) throws UnsupportedEncodingException {try {String message = msg.toString();System.out.println("form server:"+message);} finally {//释放资源,避免内存溢出ReferenceCountUtil.release(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext cxt,Throwable cause){System.out.println("client exceptionCaught method run..");cxt.close();}
}

测试

 


推荐阅读
  • 流数据流和IO流的使用及应用
    本文介绍了流数据流和IO流的基本概念和用法,包括输入流、输出流、字节流、字符流、缓冲区等。同时还介绍了异常处理和常用的流类,如FileReader、FileWriter、FileInputStream、FileOutputStream、OutputStreamWriter、InputStreamReader、BufferedReader、BufferedWriter等。此外,还介绍了系统流和标准流的使用。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 终于在课设的闲时间把netty实战的四五章给解决了这里来记录一下第四章里面所讲的IO首先说到IO,我想,必须要先了解阻塞,非阻塞,同步和异步这四个词看到一个讲的很易懂的例子:&am ... [详细]
  • Netty(三)
    开发十年,就只剩下这套架构体系了!>>>  熟悉TCP编程的读者可能都会知道,无论是服务端 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • Java SE从入门到放弃(三)的逻辑运算符详解
    本文详细介绍了Java SE中的逻辑运算符,包括逻辑运算符的操作和运算结果,以及与运算符的不同之处。通过代码演示,展示了逻辑运算符的使用方法和注意事项。文章以Java SE从入门到放弃(三)为背景,对逻辑运算符进行了深入的解析。 ... [详细]
author-avatar
琳琳小朋友m
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有