热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty拆包粘包问题解决——特殊结束符

本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。

客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。

服务端

server

package com.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;//服务器端
public class MyServer {//监听线程组,监听客户端请求private EventLoopGroup acceptorGroup = null;//处理客户端相关操作线程组,负责处理与客户端的数据通信private EventLoopGroup clientGroup = null;//服务启动相关配置信息,服务端Bootstrap带serverprivate ServerBootstrap serverBootstrap = null;public MyServer(){init();}private void init(){//初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量acceptorGroup = new NioEventLoopGroup();clientGroup = new NioEventLoopGroup();//初始化服务的配置serverBootstrap = new ServerBootstrap();//绑定线程组,acceptorGroup监听信息,clientGroup客户端信息serverBootstrap.group(acceptorGroup,clientGroup);//设定通信模式为NIO,同步非阻塞serverBootstrap.channel(NioServerSocketChannel.class);//设定缓冲区大小,缓冲区单位是字节serverBootstrap.option(ChannelOption.SO_BACKLOG,1024);//SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)serverBootstrap.option(ChannelOption.SO_SNDBUF,16*1024).option(ChannelOption.SO_RCVBUF,16*1024).option(ChannelOption.SO_KEEPALIVE,true);}public ChannelFuture doAccept(int port) throws InterruptedException {/*** childHandler是服务的bootstrap独有的方法,用于提供处理对象* 可以一次性增加若干个处理逻辑,是类似责任链模式的处理方式* 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A->B顺序依次处理*/serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符,定义的数据分隔符一定是一个ByteBuf类型的数据对象ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] acceptorHandlers = new ChannelHandler[3];//处理固定结束标记符号的Handler,这个Handler没有@Sharabler注解修饰//必须每次初始化通道时创建一个新的对象//使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度,Netty建议数据有最大长度acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换成字符串acceptorHandlers[1] = new StringDecoder(Charset.forName("utf-8"));acceptorHandlers[2] = new MyServerHandler();socketChannel.pipeline().addLast(acceptorHandlers);}});/*** bind方法 - 绑定监听端口,serverBootstrap可以绑定多个监听端口,多次调用即可* sync - 开始监听逻辑,返回一个ChannelFuture,返回结果代表的是监听成功后的一个对应的未来结果* 可以使用ChannelFuture实现后续的服务器和客户端的交互*/ChannelFuture future = serverBootstrap.bind(port).sync();/*绑定多个端口serverBootstrap.bind(port);serverBootstrap.bind(port);*/return future;}/*** shutdownGracefully - 是一个安全关闭的方法,可以保证不放弃任何一个已接收的客户端请求*/public void release(){this.acceptorGroup.shutdownGracefully();this.clientGroup.shutdownGracefully();}public static void main(String[] args){ChannelFuture future = null;MyServer myServer = null;try {myServer = new MyServer();future = myServer.doAccept(9999);System.out.println("server started");//关闭连接future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != myServer){myServer.release();}}}
}

serverHandler

package com.server;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;import java.io.UnsupportedEncodingException;/*** @Sharable代表当前Handler是一个可以分享的处理器,可以分享给多个客户端同时使用* 如不使用注解类型,每次客户请求时,必须为客户重新创建一个新的Handler对象*/
@Sharable
public class MyServerHandler extends ChannelHandlerAdapter{/*** 业务处理逻辑* 用于处理读取数据请求的逻辑* ctx - 上下文对象,其中包含于客户端建立连接的所有资源,如:对应的Channel* msg - 读取到的数据,默认类型是ByteBuf,是Netty自定义的,是对ByteBuffer的封装,不用考虑复位问题*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {String message = msg.toString();System.out.println("from client :"+message);String line = "server message $E$ test delimiter handler!! $E$ second message $E$";if ("exit".equals(message)){ctx.close();return;}//写操作自动释放缓存,避免内存溢出ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));/*如果调用的是write方法,不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行ctx.write(Unpooled.copiedBuffer(line.getBytes("utf-8")));ctx.close();*/}/*** 异常处理逻辑,当客户端异常退出时也会执行* ChannelHandlerContext关闭,也代表当前与客户端连接资源关闭*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){System.out.println("server exceptionCaught method run..");ctx.close();}
}

客户端

client

package com.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.Timer;
import java.util.concurrent.TimeUnit;/*** 客户端是请求的发起者,不需要监听* 只需要定义唯一的一个线程组即可*/
public class CustorClient {//处理请求和处理服务端响应的线程组private EventLoopGroup group = null;//客户端服务启动相关配置信息private Bootstrap bootstrap = null;public CustorClient(){init();}private void init(){group = new NioEventLoopGroup();bootstrap = new Bootstrap();//绑定线程组bootstrap.group(group);//设定通讯模式为NIObootstrap.channel(NioSocketChannel.class);}public ChannelFuture doRequest(String host, int port) throws InterruptedException {/*** 客户端的bootstrap没有childHandler方法,只有handler方法* 方法含义等同于ServerBootstrap中的childHandler* 在客户端必须绑定处理器(必须调用handler方法)* 服务器必须绑定处理器(必须调用childHandler方法)*/this.bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//数据分隔符ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());ChannelHandler[] handlers = new ChannelHandler[3];handlers[0] = new DelimiterBasedFrameDecoder(1024,delimiter);//字符串解码器handlerhandlers[1] = new StringDecoder(Charset.forName("utf-8"));handlers[2] = new CustorClientHandler();socketChannel.pipeline().addLast(handlers);}});//建立连接ChannelFuture future = this.bootstrap.connect(host,port).sync();return future;}public void release(){this.group.shutdownGracefully();}public static void main(String[] atgs){CustorClient client = null;ChannelFuture future = null;try {client = new CustorClient();future = client.doRequest("localhost", 9999);Scanner s = null;while (true) {s = new Scanner(System.in);System.out.println("enter message send to server(enter 'exit' for close client)");String line = s.nextLine();if ("exit".equals(line)) {/*** addListener - 增加监听,当条件满足时候,出发监听器* ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接*/future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8"))).addListener(ChannelFutureListener.CLOSE);break;}//Unpooled工具类用来做buffer转换future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));//睡一秒读取信息TimeUnit.SECONDS.sleep(1);}}catch (Exception e){e.printStackTrace();}finally {if (null != future){try {future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}if (null != client){client.release();}}}
}

clientHandler

package com.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;import java.io.UnsupportedEncodingException;public class CustorClientHandler extends ChannelHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext cxt, Object msg) throws UnsupportedEncodingException {try {String message = msg.toString();System.out.println("form server:"+message);} finally {//释放资源,避免内存溢出ReferenceCountUtil.release(msg);}}@Overridepublic void exceptionCaught(ChannelHandlerContext cxt,Throwable cause){System.out.println("client exceptionCaught method run..");cxt.close();}
}

测试

 


推荐阅读
  • 本文介绍如何通过Windows批处理脚本定期检查并重启Java应用程序,确保其持续稳定运行。脚本每30分钟检查一次,并在需要时重启Java程序。同时,它会将任务结果发送到Redis。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • 本文介绍如何解决在 IIS 环境下 PHP 页面无法找到的问题。主要步骤包括配置 Internet 信息服务管理器中的 ISAPI 扩展和 Active Server Pages 设置,确保 PHP 脚本能够正常运行。 ... [详细]
  • Java 中 Writer flush()方法,示例 ... [详细]
  • Java 中的 BigDecimal pow()方法,示例 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 主要用了2个类来实现的,话不多说,直接看运行结果,然后在奉上源代码1.Index.javaimportjava.awt.Color;im ... [详细]
  • 前言--页数多了以后需要指定到某一页(只做了功能,样式没有细调)html ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • Python自动化处理:从Word文档提取内容并生成带水印的PDF
    本文介绍如何利用Python实现从特定网站下载Word文档,去除水印并添加自定义水印,最终将文档转换为PDF格式。该方法适用于批量处理和自动化需求。 ... [详细]
  • 将Web服务部署到Tomcat
    本文介绍了如何在JDeveloper 12c中创建一个Java项目,并将其打包为Web服务,然后部署到Tomcat服务器。内容涵盖从项目创建、编写Web服务代码、配置相关XML文件到最终的本地部署和验证。 ... [详细]
  • Android LED 数字字体的应用与实现
    本文介绍了一种适用于 Android 应用的 LED 数字字体(digital font),并详细描述了其在 UI 设计中的应用场景及其实现方法。这种字体常用于视频、广告倒计时等场景,能够增强视觉效果。 ... [详细]
  • 掌握远程执行Linux脚本和命令的技巧
    本文将详细介绍如何利用Python的Paramiko库实现远程执行Linux脚本和命令,帮助读者快速掌握这一实用技能。通过具体的示例和详尽的解释,让初学者也能轻松上手。 ... [详细]
  • 本文探讨了如何在给定整数N的情况下,找到两个不同的整数a和b,使得它们的和最大,并且满足特定的数学条件。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
author-avatar
琳琳小朋友m
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有