本文将介绍如何实现一个基于websocket分布式聊天(IM)系统。
使用golang实现websocket通讯,单机可以支持百万连接,使用gin框架、nginx负载、可以水平部署、程序内部相互通讯、使用grpc通讯协议。
本文内容比较长,如果直接想clone项目体验直接进入项目体验 goWebSocket项目下载 ,文本从介绍webSocket是什么开始,然后开始介绍这个项目,以及在Nginx中配置域名做webSocket的转发,然后介绍如何搭建一个分布式系统。
项目地址 gowebsocket
IM-聊天首页 或者在新的窗口打开 http://im.91vh.com/home/index
打开连接以后进入聊天界面
多人群聊可以同时打开两个窗口
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
HTTP和WebSocket在通讯过程的比较
HTTP和webSocket都支持配置证书,ws:// 无证书 wss:// 配置证书的协议标识
浏览器的兼容性,开始支持webSocket的版本
服务端的支持
golang、java、php、node.js、python、nginx 都有不错的支持
Android和IOS的支持
Android可以使用java-webSocket对webSocket支持
iOS 4.2及更高版本具有WebSockets支持
从业务上出发,需要一个主动通达客户端的能力
目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,然后返回结果,不可以服务端主动向某一个客户端主动发送数据
大多数场景我们需要主动通知用户,如:聊天系统、用户完成任务主动告诉用户、一些运营活动需要通知到在线的用户
可以获取用户在线状态
在没有长连接的时候通过客户端主动轮询获取数据
可以通过一种方式实现,多种不同平台(H5/Android/IOS)去使用
客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息
Connection: Upgrade 表明连接需要升级
Upgrade: websocket 需要升级到 websocket协议
Sec-WebSocket-Version: 13 协议的版本为13
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== 这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept对应
# Request Headers
Connection: Upgrade
Host: im.91vh.com
Origin: http://im.91vh.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket
服务端接收到升级协议的请求,如果服务端支持升级协议会做如下响应
返回:
Status Code: 101 Switching Protocols 表示支持切换协议
# Response Headers
Connection: upgrade
Date: Fri, 09 Aug 2019 07:36:59 GMT
Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=
Server: nginx/1.12.1
Upgrade: websocket
websocket需要监听端口,所以需要在golang 成功的 main 函数中用协程的方式去启动程序
main.go 实现启动
go websocket.StartWebSocket()
init_acc.go 启动程序
// 启动程序
func StartWebSocket() {http.HandleFunc("/acc", wsPage)http.ListenAndServe(":8089", nil)
}
func wsPage(w http.ResponseWriter, req *http.Request) {// 升级协议conn, err :&#61; (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])return true}}).Upgrade(w, req, nil)if err !&#61; nil {http.NotFound(w, req)return}fmt.Println("webSocket 建立连接:", conn.RemoteAddr().String())currentTime :&#61; uint64(time.Now().Unix())client :&#61; NewClient(conn.RemoteAddr().String(), conn, currentTime)go client.read()go client.write()// 用户连接事件clientManager.Register <- client
}
// 连接管理
type ClientManager struct {Clients map[*Client]bool // 全部的连接ClientsLock sync.RWMutex // 读写锁Users map[string]*Client // 登录的用户 // appId&#43;uuidUserLock sync.RWMutex // 读写锁Register chan *Client // 连接连接处理Login chan *login // 用户登录处理Unregister chan *Client // 断开连接处理程序Broadcast chan []byte // 广播 向全部成员发送数据
}// 初始化
func NewClientManager() (clientManager *ClientManager) {clientManager &#61; &ClientManager{Clients: make(map[*Client]bool),Users: make(map[string]*Client),Register: make(chan *Client, 1000),Login: make(chan *login, 1000),Unregister: make(chan *Client, 1000),Broadcast: make(chan []byte, 1000),}return
}
// 向客户端写数据
func (c *Client) write() {defer func() {if r :&#61; recover(); r !&#61; nil {fmt.Println("write stop", string(debug.Stack()), r)}}()defer func() {clientManager.Unregister <- cc.Socket.Close()fmt.Println("Client发送数据 defer", c)}()for {select {case message, ok :&#61; <-c.Send:if !ok {// 发送数据错误 关闭连接fmt.Println("Client发送数据 关闭连接", c.Addr, "ok", ok)return}c.Socket.WriteMessage(websocket.TextMessage, message)}}
}
// 读取客户端数据
func (c *Client) read() {defer func() {if r :&#61; recover(); r !&#61; nil {fmt.Println("write stop", string(debug.Stack()), r)}}()defer func() {fmt.Println("读取客户端数据 关闭send", c)close(c.Send)}()for {_, message, err :&#61; c.Socket.ReadMessage()if err !&#61; nil {fmt.Println("读取客户端数据 错误", c.Addr, err)return}// 处理程序fmt.Println("读取客户端数据 处理:", string(message))ProcessData(c, message)}
}
{"seq":"1565336219141-266129","cmd":"login","data":{"userId":"马远","appId":101}}
{"seq":"1565336219141-266129","cmd":"login","response":"code":200,"codeMsg":"Success","data":null}}
/************************ 请求数据 **************************/
// 通用请求数据格式
type Request struct {Seq string &#96;json:"seq"&#96; // 消息的唯一IdCmd string &#96;json:"cmd"&#96; // 请求命令字Data interface{} &#96;json:"data,omitempty"&#96; // 数据 json
}// 登录请求数据
type Login struct {ServiceToken string &#96;json:"serviceToken"&#96; // 验证用户是否登录AppId uint32 &#96;json:"appId,omitempty"&#96;UserId string &#96;json:"userId,omitempty"&#96;
}// 心跳请求数据
type HeartBeat struct {UserId string &#96;json:"userId,omitempty"&#96;
}
/************************ 响应数据 **************************/
type Head struct {Seq string &#96;json:"seq"&#96; // 消息的IdCmd string &#96;json:"cmd"&#96; // 消息的cmd 动作Response *Response &#96;json:"response"&#96; // 消息体
}type Response struct {Code uint32 &#96;json:"code"&#96;CodeMsg string &#96;json:"codeMsg"&#96;Data interface{} &#96;json:"data"&#96; // 数据 json
}
// Websocket 路由
func WebsocketInit() {websocket.Register("login", websocket.LoginController)websocket.Register("heartbeat", websocket.HeartbeatController)
}
定时任务清除超时连接
没有登录的连接和登录的连接6分钟没有心跳则断开连接
client_manager.go
// 定时清理超时连接
func ClearTimeoutConnections() {currentTime :&#61; uint64(time.Now().Unix())for client :&#61; range clientManager.Clients {if client.IsHeartbeatTimeout(currentTime) {fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)client.Socket.Close()}}
}
读写的Goroutine有一个失败&#xff0c;则相互关闭
write()Goroutine写入数据失败&#xff0c;关闭c.Socket.Close()连接&#xff0c;会关闭read()Goroutine
read()Goroutine读取数据失败&#xff0c;关闭close(c.Send)连接&#xff0c;会关闭write()Goroutine
客户端主动关闭
关闭读写的Goroutine
从ClientManager删除连接
监控用户连接、Goroutine数
十个内存溢出有九个和Goroutine有关
添加一个http的接口&#xff0c;可以查看系统的状态&#xff0c;防止Goroutine不回收
查看系统状态
Nginx 配置不活跃的连接释放时间&#xff0c;防止忘记关闭的连接
使用 pprof 分析性能、耗时
js 建立连接&#xff0c;并处理连接成功、收到数据、断开连接的事件处理
ws &#61; new WebSocket("ws://127.0.0.1:8089/acc");ws.onopen &#61; function(evt) {console.log("Connection open ...");
};ws.onmessage &#61; function(evt) {console.log( "Received Message: " &#43; evt.data);data_array &#61; JSON.parse(evt.data);console.log( data_array);
};ws.onclose &#61; function(evt) {console.log("Connection closed.");
};
需要注意:连接建立成功以后才可以发送数据
建立连接以后由客户端向服务器发送数据示例
登录:
ws.send(&#39;{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}&#39;);心跳:
ws.send(&#39;{"seq":"2324","cmd":"heartbeat","data":{}}&#39;);ping 查看服务是否正常:
ws.send(&#39;{"seq":"2325","cmd":"ping","data":{}}&#39;);关闭连接:
ws.close();
4、goWebSocket 项目
本项目是基于webSocket实现的分布式IM系统
客户端随机分配用户名&#xff0c;所有人进入一个聊天室&#xff0c;实现群聊的功能
单台机器(24核128G内存)支持百万客户端连接
支持水平部署&#xff0c;部署的机器之间可以相互通讯
项目架构图
本项目只需要使用 redis 和 golang
本项目使用govendor管理依赖&#xff0c;克隆本项目就可以直接使用
# 主要使用到的包
github.com/gin-gonic/gin&#64;v1.4.0
github.com/go-redis/redis
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf
克隆项目
git clone git&#64;github.com:link1st/gowebsocket.git
# 或
git clone https://github.com/link1st/gowebsocket.git
修改项目配置
cd gowebsocket
cd config
mv app.yaml.example app.yaml
# 修改项目监听端口&#xff0c;redis连接等(默认127.0.0.1:3306)
vim app.yaml
# 返回项目目录&#xff0c;为以后启动做准备
cd ..
配置文件说明
app:logFile: log/gin.log # 日志文件位置httpPort: 8080 # http端口webSocketPort: 8089 # webSocket端口rpcPort: 9001 # 分布式部署程序内部通讯端口httpUrl: 127.0.0.1:8080webSocketUrl: 127.0.0.1:8089redis:addr: "localhost:6379"password: ""DB: 0poolSize: 30minIdleConns: 30
启动项目
go run main.go
进入IM聊天地址
http://127.0.0.1:8080/home/index
到这里&#xff0c;就可以体验到基于webSocket的IM系统
使用nginx实现内外网分离&#xff0c;对外只暴露Nginx的Ip(一般的互联网企业会在nginx之前加一层LVS做负载均衡)&#xff0c;减少入侵的可能
使用Nginx可以利用Nginx的负载功能&#xff0c;前端再使用的时候只需要连接固定的域名&#xff0c;通过Nginx将流量分发了到不同的机器
同时我们也可以使用Nginx的不同的负载策略(轮询、weight、ip_hash)
使用域名 im.91vh.com 为示例&#xff0c;参考配置
一级目录im.91vh.com/acc 是给webSocket使用&#xff0c;是用nginx stream转发功能(nginx 1.3.31 开始支持&#xff0c;使用Tengine配置也是相同的)&#xff0c;转发到golang 8089 端口处理
其它目录是给HTTP使用&#xff0c;转发到golang 8080 端口处理
upstream go-im
{server 127.0.0.1:8080 weight&#61;1 max_fails&#61;2 fail_timeout&#61;10s;keepalive 16;
}upstream go-acc
{server 127.0.0.1:8089 weight&#61;1 max_fails&#61;2 fail_timeout&#61;10s;keepalive 16;
}server {listen 80 ;server_name im.91vh.com;index index.html index.htm ;location /acc {proxy_set_header Host $host;proxy_pass http://go-acc;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection $connection_upgrade;proxy_set_header Connection "";proxy_redirect off;proxy_intercept_errors on;client_max_body_size 10m;}location /{proxy_set_header Host $host;proxy_pass http://go-im;proxy_http_version 1.1;proxy_set_header Connection "";proxy_redirect off;proxy_intercept_errors on;client_max_body_size 30m;}access_log /link/log/nginx/access/im.log;error_log /link/log/nginx/access/im.error.log;
}
运行nginx测试命令&#xff0c;查看配置文件是否正确
/link/server/tengine/sbin/nginx -t
如果出现错误
nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
处理方法
在nginx.com添加
http{fastcgi_temp_file_write_size 128k;
..... # 需要添加的内容#support websocketmap $http_upgrade $connection_upgrade {default upgrade;&#39;&#39; close;}.....gzip on;}
原因:Nginx代理webSocket的时候就会遇到Nginx的设计问题 End-to-end and Hop-by-hop Headers
6、压测设置文件打开句柄数
ulimit -n 1000000
设置sockets连接参数
vim /etc/sysctl.conf
net.ipv4.tcp_tw_reuse &#61; 1
net.ipv4.tcp_tw_recycle &#61; 0
待压测&#xff0c;如果大家有压测的结果欢迎补充
后续会出专门的教程,从申请机器、写压测用例、内核优化、得出压测数据
关于压测请移步 go-stress-testing&#xff0c;从申请机器开始&#xff0c;优化内核&#xff0c;部署项目压测&#xff0c;解释压测的原理
在线用户数 | cpu | 内存 | I/O | net.out |
1W | ||||
10W | ||||
100W |
参考本项目源码
gowebsocket v1.0.0 单机版Im系统
gowebsocket v2.0.0 分布式Im系统
为了方便演示&#xff0c;IM系统和webSocket(acc)系统合并在一个系统中
IM系统接口:
获取全部在线的用户&#xff0c;查询单前服务的全部用户&#43;集群中服务的全部用户
发送消息&#xff0c;这里采用的是http接口发送(微信网页版发送消息也是http接口)&#xff0c;这里考虑主要是两点:
1.服务分离&#xff0c;让acc系统尽量的简单一点&#xff0c;不掺杂其它业务逻辑
2.发送消息是走http接口&#xff0c;不使用webSocket连接&#xff0c;才用收和发送数据分离的方式&#xff0c;可以加快收发数据的效率
项目启动注册和用户连接时序图
其它系统(IM、任务)向webSocket(acc)系统连接的用户发送消息时序图
# app.yaml 配置文件信息
app:logFile: log/gin.loghttpPort: 8080webSocketPort: 8089rpcPort: 9001httpUrl: im.91vh.comwebSocketUrl: im.91vh.com# 在启动项目
go run main.go
# 将第一个项目拷贝一份
cp -rf gowebsocket gowebsocket1
# app.yaml 修改配置文件
app:logFile: log/gin.loghttpPort: 8081webSocketPort: 8090rpcPort: 9002httpUrl: im.91vh.comwebSocketUrl: im.91vh.com# 在启动第二个项目
go run main.go
在之前Nginx配置项中添加第二台机器的Ip和端口
upstream go-im
{server 127.0.0.1:8080 weight&#61;1 max_fails&#61;2 fail_timeout&#61;10s;server 127.0.0.1:8081 weight&#61;1 max_fails&#61;2 fail_timeout&#61;10s;keepalive 16;
}upstream go-acc
{server 127.0.0.1:8089 weight&#61;1 max_fails&#61;2 fail_timeout&#61;10s;server 127.0.0.1:8090 weight&#61;1 max_fails&#61;2 fail_timeout&#61;10s;keepalive 16;
}
查看请求是否落在两个项目上
实验两个用户分别连接不同的项目(gowebsocket和gowebsocket1)是否也可以相互发送消息
本项目只是演示了这个项目如何分布式部署&#xff0c;以及分布式部署以后模块如何进行相互通讯
完全解决系统没有单点的故障&#xff0c;还需 Nginx集群、redis cluster等
IM实现细节:
WebSocket轻松单台服务器5w并发jmeter实测_weixin_34379433的博客-CSDN博客 nginx LVS 的 DR模式实现 nginx 瓶颈突破 2^16 连接数限制
维基百科 WebSocket
阮一峰 WebSocket教程
WebSocket协议&#xff1a;5分钟从入门到精通
go-stress-testing 单台机器100w连接压测实战
github 搜:link1st 查看项目 gowebsocket
https://github.com/link1st/gowebsocket