热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty+Websocket聊天、推送(实战)

文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录语雀版|总目录码云版|总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试

文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 语雀版 | 总目录 码云版| 总目录 博客园版 为您奉上珍贵的学习资源 :

  • 免费赠送 :《尼恩Java面试宝典》持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备

  • 免费赠送 经典图书:《Java高并发核心编程(卷1)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 经典图书:《Java高并发核心编程(卷2)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 经典图书:《Netty Zookeeper Redis 高并发实战》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 经典图书:《SpringCloud Nginx高并发核心编程》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

  • 免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取


推荐:入大厂 、做架构、大力提升Java 内功 的 精彩博文

入大厂 、做架构、大力提升Java 内功 必备的精彩博文 秋招涨薪1W + 必备的精彩博文
1:Redis 分布式锁 (图解-秒懂-史上最全) 2:Zookeeper 分布式锁 (图解-秒懂-史上最全)
3: Redis与MySQL双写一致性如何保证? (面试必备) 4: 面试必备:秒杀超卖 解决方案 (史上最全)
5:面试必备之:Reactor模式 6: 10分钟看懂, Java NIO 底层原理
7:TCP/IP(图解+秒懂+史上最全) 8:Feign原理 (图解)
9:DNS图解(秒懂 + 史上最全 + 高薪必备) 10:CDN图解(秒懂 + 史上最全 + 高薪必备)
11: 分布式事务( 图解 + 史上最全 + 吐血推荐 ) 12:限流:计数器、漏桶、令牌桶
三大算法的原理与实战(图解+史上最全)
13:架构必看:12306抢票系统亿级流量架构
(图解+秒懂+史上最全)
14:seata AT模式实战(图解+秒懂+史上最全)
15:seata 源码解读(图解+秒懂+史上最全) 16:seata TCC模式实战(图解+秒懂+史上最全)

SpringCloud 微服务 精彩博文
nacos 实战(史上最全) sentinel (史上最全+入门教程)
SpringCloud gateway (史上最全) 分库分表sharding-jdbc底层原理与实操(史上最全,5W字长文,吐血推荐)

具体详情,请点击此链接

尼恩Java面试宝典,32个最新pdf,含2000多页不断更新、持续迭代 具体详情,请点击此链接

Netty+Websocket 聊天、推送(实战)


推荐: 地表最强 开发环境 系列

工欲善其事 必先利其器
地表最强 开发环境: vagrant+java+springcloud+redis+zookeeper镜像下载(&制作详解)
地表最强 热部署:java SpringBoot SpringCloud 热部署 热加载 热调试
地表最强 发请求工具(再见吧, PostMan ):IDEA HTTP Client(史上最全)
地表最强 PPT 小工具: 屌炸天,像写代码一样写PPT
无编程不创客,无编程不创客,一大波编程高手正在疯狂创客圈交流、学习中! 找组织,GO

什么是Netty ?

Netty是由Jboss提供的一款著名的开源框架,常用于搭建 RPC中的TCP服务器、Websocket服务器,甚至是类似Tomcat的Web服务器,反正就是各种网络服务器,在处理高并发的项目中,有奇用!功能丰富且性能良好,基于Java中NIO的二次封装,具有比原生NIO更好更稳健的体验。

关于Netty 原理,请参见 《Netty Zookeeper Redis 高并发实战》 一书

为什么要使用 Netty 替代 Tomcat?

很多项目,都需要基于 Websocket 协议做在线客服、在线推送、在线聊天,虽然 Tomcat 内置支持 Websocket 协议,但是由于 Tomcat 的吞吐量、连接数都很低,作为测试是可以的。在生产环境,一定需要使用高吞吐量、高连接数的 Netty 服务器进行替代

Netty+Websocket 聊天、推送(实战)

之所以 Netty 性能高,因为其使用的是 Reactor 反应器模式。关于反应器模式原理,请参见 《Netty Zookeeper Redis 高并发实战》 一书。

Netty+WS 在线聊天(在线推送)功能演示

聊天过程gif 演示:
Netty+Websocket 聊天、推送(实战)

聊天示意图:
Netty+Websocket 聊天、推送(实战)

Springboot+Netty 项目结构

Netty+Websocket 聊天、推送(实战)

Netty 服务启动

Netty搭建的服务器基本上都是差不多的写法:

绑定主线程组和工作线程组,这部分对应架构图中的事件循环组。其原理,,请参见 《Netty Zookeeper Redis 高并发实战》 一书。

重点就是ChannelInitializer的配置,以异步的方式启动,最后是结束的时候关闭线程组。


    /**
     * 启动即时通讯服务器
     */
    public void start()
    {
        final WebSocketServer webSocketServer = new WebSocketServer();


        ChannelFuture channelFuture = null;
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChatServerInitializer());
        InetSocketAddress address = new InetSocketAddress(9999);
        channelFuture = bootstrap.bind(address);
        channelFuture.syncUninterruptibly();

        channel = channelFuture.channel();
        // 返回与当前Java应用程序关联的运行时对象
        Runtime.getRuntime().addShutdownHook(new Thread()
        {
            @Override
            public void run()
            {
                webSocketServer.stop();
            }
        });
        channelFuture.channel().closeFuture().syncUninterruptibly();
    }

    /**
     * 内部类
     */
    class ChatServerInitializer extends ChannelInitializer
    {
        private static final int READ_IDLE_TIME_OUT = 60; // 读超时  s
        private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时
        private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时


        @Override
        protected void initChannel(Channel ch) throws Exception
        {
            ChannelPipeline pipeline = ch.pipeline();
            // HTTP请求的解码和编码
            pipeline.addLast(new HttpServerCodec());
            // 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; 增加之后就不用考虑这个问题了
            pipeline.addLast(new ChunkedWriteHandler());
            // 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse,
            // 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent
            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
            // WebSocket数据压缩
            pipeline.addLast(new WebSocketServerCompressionHandler());
            // 协议包长度限制
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024));

            // //当连接在60秒内没有接收到消息时,进会触发一个 IdleStateEvent 事件,被 HeartbeatHandler 的 userEventTriggered 方法处理
            pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));
            pipeline.addLast(new TextWebSocketFrameHandler());

        }
    }

报文处理器

/**
 * Created by 尼恩 @ 疯狂创客圈
 * 

* WebSocket 帧:WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧 */ @Slf4j public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //增加消息的引用计数(保留消息),并将他写到 ChannelGroup 中所有已经连接的客户端 ServerSession session = ServerSession.getSession(ctx); Map result = ChatProcesser.inst().onMessage(msg.text(), session); if (result != null && null!=result.get("type")) { switch (result.get("type")) { case "msg": SessionMap.inst().sendToOthers(result, session); break; case "init": SessionMap.inst().addSession(result, session); break; } } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //是否握手成功,升级为 Websocket 协议 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息 // 并把握手成功的 Channel 加入到 ChannelGroup 中 ServerSession session = new ServerSession(ctx.channel()); String echo = ChatProcesser.inst().onOpen(session); SessionMap.inst().sendMsg(ctx, echo); } else if (evt instanceof IdleStateEvent) { IdleStateEvent stateEvent = (IdleStateEvent) evt; if (stateEvent.state() == IdleState.READER_IDLE) { ServerSession session = ServerSession.getSession(ctx); SessionMap.inst().remove(session); session.processError(null); } } else { super.userEventTriggered(ctx, evt); } } }

业务处理器

下面是用websocket做聊天室的逻辑:

  • 使用 Json 传递实体消息;

  • ServerSession 存储了每个会话,保存对 Channel和 User,使用User 表示连接上来用户

  • 前端要求填入用户和房间(群组)后,模拟登录,并返回用户列表。进入后可以发送群组消息。

package com.crazymaker.websocket.processer;

import com.crazymaker.websocket.Model.User;
import com.crazymaker.websocket.session.ServerSession;
import com.crazymaker.websocket.session.SessionMap;
import com.crazymaker.websocket.util.JsonUtil;
import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * 业务处理器
 * Created by 尼恩 @ 疯狂创客圈
 */
@Slf4j
public class ChatProcesser
{
    private static final Logger logger = LoggerFactory.getLogger(ChatProcesser.class);


    /**
     * 单例
     */
    private static ChatProcesser singleInstance = new ChatProcesser();

    public static ChatProcesser inst()
    {
        return singleInstance;
    }

    /**
     * 连接建立成功调用的方法
     *
     * @param s 会话
     */
    public String onOpen(ServerSession s) throws IOException
    {
        Map result = new HashMap<>();
        result.put("type", "bing");
        result.put("sendUser", "系统消息");
        result.put("id", s.getId());

        String json = JsonUtil.pojoToJson(result);
        return json;
    }

    /**
     * 连接关闭调用的方法
     */
    public String onClose(ServerSession s)
    {
        User user = s.getUser();
        if (user != null)
        {
            String nick = user.getNickname();
            Map result = new HashMap<>();
            result.put("type", "init");
            result.put("msg", nick + "离开房间");
            result.put("sendUser", "系统消息");
            String json = JsonUtil.pojoToJson(result);
            return json;
        }
        return null;
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 消息内容
     * @param session 会哈
     */
    public Map onMessage(String message, ServerSession session)
    {
        TypeToken typeToken = new TypeToken>()
        {
        };
        Map map = JsonUtil.jsonToPojo(message, typeToken);
        Map result = new HashMap<>();
        User user = null;
        switch (map.get("type"))
        {
            case "msg":
                user = session.getUser();
                result.put("type", "msg");
                result.put("msg", map.get("msg"));
                result.put("sendUser", user.getNickname());
                break;
            case "init":
                String room = map.get("room");
                session.setGroup(room);
                String nick = map.get("nick");
                user = new User(session.getId(), nick);
                session.setUser(user);
                result.put("type", "init");
                result.put("msg", nick + "成功加入房间");
                result.put("sendUser", "系统消息");

                break;
            case "ping":
                break;
        }

        return result;
    }

    /**
     * 连接发生错误时的调用方法
     *
     * @param session 会话
     * @param error   异常
     */
    public String onError(ServerSession session, Throwable error)
    {

        //捕捉异常信息
        if (null != error)
        {
            log.error(error.getMessage());
        }

        User user = session.getUser();
        if (user == null)
        {
            return null;
        }
        String nick = user.getNickname();

        Map result = new HashMap<>();
        result.put("type", "init");
        result.put("msg", nick + "离开房间");
        result.put("sendUser", "系统消息");

        String json = JsonUtil.pojoToJson(result);
        return json;
    }


}

疯狂创客圈▶

疯狂创客圈 - Java高并发研习社群,为大家开启大厂之门


推荐阅读
  • 一面自我介绍对象相等的判断,equals方法实现。可以简单描述挫折,并说明自己如何克服,最终有哪些收获。职业规划表明自己决心,首先自己不准备继续求学了,必须招工作了。希望去哪 ... [详细]
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
  • 原文地址:https:www.cnblogs.combaoyipSpringBoot_YML.html1.在springboot中,有两种配置文件,一种 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 一次上线事故,30岁+的程序员踩坑经验之谈
    本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ... [详细]
  • 2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
    本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ... [详细]
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • 负载均衡_Nginx反向代理动静分离负载均衡及rewrite隐藏路径详解(Nginx Apache MySQL Redis)–第二部分
    nginx反向代理、动静分离、负载均衡及rewrite隐藏路径详解 ... [详细]
  • 1.脚本功能1)自动替换jar包中的配置文件。2)自动备份老版本的Jar包3)自动判断是初次启动还是更新服务2.脚本准备进入ho ... [详细]
  • 基于分布式锁的防止重复请求解决方案
    一、前言关于重复请求,指的是我们服务端接收到很短的时间内的多个相同内容的重复请求。而这样的重复请求如果是幂等的(每次请求的结果都相同,如查 ... [详细]
  • 缓存 分布式锁 Redis
    分布式锁现在Redis基本上没家公司都在使用,只是各自使用的场景不以,但Redis最出名的还是做为缓存服务器,提搞服务器的的吞吐量,下面我们来围绕这个作为缓存做一个总结今天的目标其 ... [详细]
  • Java开发实战讲解!字节跳动三场技术面+HR面
    二、回顾整理阿里面试题基本就这样了,还有一些零星的问题想不起来了,答案也整理出来了。自我介绍JVM如何加载一个类的过程,双亲委派模型中有 ... [详细]
  • Java开发面试问题,2021网易Java高级面试题及答案,实战案例
    前言大厂面试真题向来都是各大求职者的最佳练兵场,而今天小编带来的便是“HUAWEI”面经!这是一次真实的面试经历,虽然不是我自己亲身经历 ... [详细]
  • linux下编译安装lnmp
    2019独角兽企业重金招聘Python工程师标准#######################安装依赖#####################安装必要的包:y ... [详细]
author-avatar
山海
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有