热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty学习之TCP粘包/拆包

一.TCP粘包拆包问题说明,如图二.未考虑TCP粘包导致功能异常案例按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间1.服务端类packag
一.TCP粘包/拆包问题说明,如图



二.未考虑TCP粘包导致功能异常案例
    按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间

1.服务端类

package com.phei.netty.s2016042302;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* 服务端
* @author renhj
*
*/
public class TimeServer {

public void bind(int port) throws Exception{

//第一个用户服务器接收客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//第二个用户SocketChannel的网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//创建ServerBootstrap对象,启动 NIO服务端的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).
//设置为NIO
channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

//等待服务器监听端口关闭
f.channel().closeFuture().sync();

}finally{
//释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

private class ChildChannelHandler extends ChannelInitializer{

@Override
protected void initChannel(SocketChannel arg0) throws Exception {

arg0.pipeline().addLast(new TimeServerHandler());

}
}

public static void main(String[] args) throws Exception {

int port = 8080;
if(args != null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){
//采用默认值
}
}
new TimeServer().bind(port);
}

}

2.服务端核心处理类

package com.phei.netty.s2016042302;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 服务端核心处理类
* @author renhj
*
*/
public class TimeServerHandler extends ChannelHandlerAdapter {

private int counter;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8").substring(0,req.length-System.getProperty("line.separator").length());
System.out.println("The time server receive order :"+ body+" ; the counter is :"+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?
new Date(System.currentTimeMillis()).toString():"BAD ORDER";
currentTime = currentTime +System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}





}

3.客户端类

package com.phei.netty.s2016042302;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* 客服端
* @author renhj
*
*/

public class TimeClient {

public void connect(int port , String host) throws Exception{

//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer(){

@Override
public void initChannel(SocketChannel ch) throws Exception{

ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();

//等待客户端链路关闭
f.channel().closeFuture().sync();

}finally{
//释放NIO线程
group.shutdownGracefully();
}

}
public static void main(String[] args) throws Exception{

int port = 8080;
if(args !=null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){

}
}
new TimeClient().connect(port, "127.0.0.1");
}
}


4.客户端核心处理类


package com.phei.netty.s2016042302;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* 客户端核心处理类
* @author renhj
*
*/
public class TimeClientHandler extends ChannelHandlerAdapter {

private int counter;
private byte[] req;

public TimeClientHandler(){
req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

ByteBuf message = null;
for(int i=0;i<100;i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("Now is : "+body +" ; the counter is : "+ ++counter);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

//释放资源
ctx.close();
}


}

5.运行结果





三.利用LineBasedFrameDecoder+StringDecoder(换行符的方式)解决TCP粘包问题


1.服务端类


package com.phei.netty.s20160423;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* 服务端
* @author renhj
*
*/
public class TimeServer {

public void bind(int port) throws Exception{

//第一个用户服务器接收客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//第二个用户SocketChannel的网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//创建ServerBootstrap对象,启动 NIO服务端的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).
//设置为NIO
channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

//等待服务器监听端口关闭
f.channel().closeFuture().sync();

}finally{
//释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

private class ChildChannelHandler extends ChannelInitializer{

@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//解决TCP粘包问题
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());

arg0.pipeline().addLast(new TimeServerHandler());

}
}

public static void main(String[] args) throws Exception {

int port = 8080;
if(args != null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){
//采用默认值
}
}
new TimeServer().bind(port);
}

}

2.服务端核心处理类


package com.phei.netty.s20160423;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 服务端核心处理类
* @author renhj
*
*/
public class TimeServerHandler extends ChannelHandlerAdapter {

private int counter;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String body = (String)msg;
System.out.println("The time server receive order :"+ body+" ; the counter is :"+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?
new Date(System.currentTimeMillis()).toString():"BAD ORDER";
currentTime = currentTime +System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}





}

3.客户端类

  
package com.phei.netty.s20160423;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* 客服端
* @author renhj
*
*/

public class TimeClient {

public void connect(int port , String host) throws Exception{

//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer(){

@Override
public void initChannel(SocketChannel ch) throws Exception{
//解决TCP粘包问题
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());

ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();

//等待客户端链路关闭
f.channel().closeFuture().sync();

}finally{
//释放NIO线程
group.shutdownGracefully();
}

}
public static void main(String[] args) throws Exception{

int port = 8080;
if(args !=null && args.length>0){
try{
port = Integer.valueOf(args[0]);
}catch(Exception e){

}
}
new TimeClient().connect(port, "127.0.0.1");
}
}

4.客户端核心处理类

package com.phei.netty.s20160423;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* 客户端核心处理类
* @author renhj
*
*/
public class TimeClientHandler extends ChannelHandlerAdapter {

private int counter;
private byte[] req;

public TimeClientHandler(){
req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

ByteBuf message = null;
for(int i=0;i<100;i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


String body = (String)msg;
System.out.println("Now is : "+body +" ; the counter is : "+ ++counter);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

//释放资源
ctx.close();
}


}

5.运行结果,完美解决




推荐阅读
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 本文介绍了Java中Hashtable的clear()方法,该方法用于清除和移除指定Hashtable中的所有键。通过示例程序演示了clear()方法的使用。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • java线程池的实现原理源码分析
    这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
  • 使用Flutternewintegration_test进行示例集成测试?回答首先在dev下的p ... [详细]
  • java多线程获取线程返回结果
    我们在使用java多线程编写相关业务代码时,往往有这样一种情况,某个线程依赖于其他线程执行结果。也就是说,我们需要在一个线程中获取另一个线程的信息。可以分为两种情况,一种是轮询,一 ... [详细]
  • 阅读spring5源码DefaultSingletonBeanRegistry类遇到问题发现SpringBean中存在大量回调机制和aware接口,于是特意去了解 ... [详细]
  • BugDescriptionWhencategoricalvariablehas ... [详细]
author-avatar
中医鸣芳
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有