文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 语雀版 | 总目录 码云版| 总目录 博客园版 为您奉上珍贵的学习资源 :
免费赠送 :《尼恩Java面试宝典》持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备
免费赠送 经典图书:《Java高并发核心编程(卷1)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷2)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Netty Zookeeper Redis 高并发实战》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《SpringCloud Nginx高并发核心编程》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取
入大厂 、做架构、大力提升Java 内功 必备的精彩博文 | 秋招涨薪1W + 必备的精彩博文 |
---|---|
1:Redis 分布式锁 (图解-秒懂-史上最全) | 2:Zookeeper 分布式锁 (图解-秒懂-史上最全) |
3: Redis与MySQL双写一致性如何保证? (面试必备) | 4: 面试必备:秒杀超卖 解决方案 (史上最全) |
5:面试必备之:Reactor模式 | 6: 10分钟看懂, Java NIO 底层原理 |
7:TCP/IP(图解+秒懂+史上最全) | 8:Feign原理 (图解) |
9:DNS图解(秒懂 + 史上最全 + 高薪必备) | 10:CDN图解(秒懂 + 史上最全 + 高薪必备) |
11: 分布式事务( 图解 + 史上最全 + 吐血推荐 ) | 12:限流:计数器、漏桶、令牌桶 三大算法的原理与实战(图解+史上最全) |
13:架构必看:12306抢票系统亿级流量架构 (图解+秒懂+史上最全) |
14:seata AT模式实战(图解+秒懂+史上最全) |
15:seata 源码解读(图解+秒懂+史上最全) | 16:seata TCC模式实战(图解+秒懂+史上最全) |
SpringCloud 微服务 精彩博文 | |
---|---|
nacos 实战(史上最全) | sentinel (史上最全+入门教程) |
SpringCloud gateway (史上最全) | 分库分表sharding-jdbc底层原理与实操(史上最全,5W字长文,吐血推荐) |
尼恩Java面试宝典,32个最新pdf,含2000多页,不断更新、持续迭代 具体详情,请点击此链接
工欲善其事 必先利其器 |
---|
地表最强 开发环境: vagrant+java+springcloud+redis+zookeeper镜像下载(&制作详解) |
地表最强 热部署:java SpringBoot SpringCloud 热部署 热加载 热调试 |
地表最强 发请求工具(再见吧, PostMan ):IDEA HTTP Client(史上最全) |
地表最强 PPT 小工具: 屌炸天,像写代码一样写PPT |
无编程不创客,无编程不创客,一大波编程高手正在疯狂创客圈交流、学习中! 找组织,GO |
Netty是由Jboss提供的一款著名的开源框架,常用于搭建 RPC中的TCP服务器、Websocket服务器,甚至是类似Tomcat的Web服务器,反正就是各种网络服务器,在处理高并发的项目中,有奇用!功能丰富且性能良好,基于Java中NIO的二次封装,具有比原生NIO更好更稳健的体验。
关于Netty 原理,请参见 《Netty Zookeeper Redis 高并发实战》 一书
很多项目,都需要基于 Websocket 协议做在线客服、在线推送、在线聊天,虽然 Tomcat 内置支持 Websocket 协议,但是由于 Tomcat 的吞吐量、连接数都很低,作为测试是可以的。在生产环境,一定需要使用高吞吐量、高连接数的 Netty 服务器进行替代。
之所以 Netty 性能高,因为其使用的是 Reactor 反应器模式。关于反应器模式原理,请参见 《Netty Zookeeper Redis 高并发实战》 一书。
聊天过程gif 演示:
聊天示意图:
Netty搭建的服务器基本上都是差不多的写法:
绑定主线程组和工作线程组,这部分对应架构图中的事件循环组。其原理,,请参见 《Netty Zookeeper Redis 高并发实战》 一书。
重点就是ChannelInitializer的配置,以异步的方式启动,最后是结束的时候关闭线程组。
/**
* 启动即时通讯服务器
*/
public void start()
{
final WebSocketServer webSocketServer = new WebSocketServer();
ChannelFuture channelFuture = null;
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChatServerInitializer());
InetSocketAddress address = new InetSocketAddress(9999);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
// 返回与当前Java应用程序关联的运行时对象
Runtime.getRuntime().addShutdownHook(new Thread()
{
@Override
public void run()
{
webSocketServer.stop();
}
});
channelFuture.channel().closeFuture().syncUninterruptibly();
}
/**
* 内部类
*/
class ChatServerInitializer extends ChannelInitializer
{
private static final int READ_IDLE_TIME_OUT = 60; // 读超时 s
private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时
private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时
@Override
protected void initChannel(Channel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
// HTTP请求的解码和编码
pipeline.addLast(new HttpServerCodec());
// 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; 增加之后就不用考虑这个问题了
pipeline.addLast(new ChunkedWriteHandler());
// 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse,
// 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
// WebSocket数据压缩
pipeline.addLast(new WebSocketServerCompressionHandler());
// 协议包长度限制
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024));
// //当连接在60秒内没有接收到消息时,进会触发一个 IdleStateEvent 事件,被 HeartbeatHandler 的 userEventTriggered 方法处理
pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
/**
* Created by 尼恩 @ 疯狂创客圈
*
* WebSocket 帧:WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧
*/
@Slf4j
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler
{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception
{
//增加消息的引用计数(保留消息),并将他写到 ChannelGroup 中所有已经连接的客户端
ServerSession session = ServerSession.getSession(ctx);
Map result = ChatProcesser.inst().onMessage(msg.text(), session);
if (result != null && null!=result.get("type"))
{
switch (result.get("type"))
{
case "msg":
SessionMap.inst().sendToOthers(result, session);
break;
case "init":
SessionMap.inst().addSession(result, session);
break;
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
{
//是否握手成功,升级为 Websocket 协议
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)
{
// 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息
// 并把握手成功的 Channel 加入到 ChannelGroup 中
ServerSession session = new ServerSession(ctx.channel());
String echo = ChatProcesser.inst().onOpen(session);
SessionMap.inst().sendMsg(ctx, echo);
} else if (evt instanceof IdleStateEvent)
{
IdleStateEvent stateEvent = (IdleStateEvent) evt;
if (stateEvent.state() == IdleState.READER_IDLE)
{
ServerSession session = ServerSession.getSession(ctx);
SessionMap.inst().remove(session);
session.processError(null);
}
} else
{
super.userEventTriggered(ctx, evt);
}
}
}
下面是用websocket做聊天室的逻辑:
使用 Json 传递实体消息;
ServerSession 存储了每个会话,保存对 Channel和 User,使用User 表示连接上来用户
前端要求填入用户和房间(群组)后,模拟登录,并返回用户列表。进入后可以发送群组消息。
package com.crazymaker.websocket.processer;
import com.crazymaker.websocket.Model.User;
import com.crazymaker.websocket.session.ServerSession;
import com.crazymaker.websocket.session.SessionMap;
import com.crazymaker.websocket.util.JsonUtil;
import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 业务处理器
* Created by 尼恩 @ 疯狂创客圈
*/
@Slf4j
public class ChatProcesser
{
private static final Logger logger = LoggerFactory.getLogger(ChatProcesser.class);
/**
* 单例
*/
private static ChatProcesser singleInstance = new ChatProcesser();
public static ChatProcesser inst()
{
return singleInstance;
}
/**
* 连接建立成功调用的方法
*
* @param s 会话
*/
public String onOpen(ServerSession s) throws IOException
{
Map result = new HashMap<>();
result.put("type", "bing");
result.put("sendUser", "系统消息");
result.put("id", s.getId());
String json = JsonUtil.pojoToJson(result);
return json;
}
/**
* 连接关闭调用的方法
*/
public String onClose(ServerSession s)
{
User user = s.getUser();
if (user != null)
{
String nick = user.getNickname();
Map result = new HashMap<>();
result.put("type", "init");
result.put("msg", nick + "离开房间");
result.put("sendUser", "系统消息");
String json = JsonUtil.pojoToJson(result);
return json;
}
return null;
}
/**
* 收到客户端消息后调用的方法
*
* @param message 消息内容
* @param session 会哈
*/
public Map onMessage(String message, ServerSession session)
{
TypeToken typeToken = new TypeToken>()
{
};
Map map = JsonUtil.jsonToPojo(message, typeToken);
Map result = new HashMap<>();
User user = null;
switch (map.get("type"))
{
case "msg":
user = session.getUser();
result.put("type", "msg");
result.put("msg", map.get("msg"));
result.put("sendUser", user.getNickname());
break;
case "init":
String room = map.get("room");
session.setGroup(room);
String nick = map.get("nick");
user = new User(session.getId(), nick);
session.setUser(user);
result.put("type", "init");
result.put("msg", nick + "成功加入房间");
result.put("sendUser", "系统消息");
break;
case "ping":
break;
}
return result;
}
/**
* 连接发生错误时的调用方法
*
* @param session 会话
* @param error 异常
*/
public String onError(ServerSession session, Throwable error)
{
//捕捉异常信息
if (null != error)
{
log.error(error.getMessage());
}
User user = session.getUser();
if (user == null)
{
return null;
}
String nick = user.getNickname();
Map result = new HashMap<>();
result.put("type", "init");
result.put("msg", nick + "离开房间");
result.put("sendUser", "系统消息");
String json = JsonUtil.pojoToJson(result);
return json;
}
}
疯狂创客圈 - Java高并发研习社群,为大家开启大厂之门