大家好,
第一次发帖,不要拍砖啊,哈哈。
目前正在设计消息系统这块,已经有了2种方案,都是比较不错的,但是现在纠结在消息持久化这块。
第一种比较传统:收到消息后直接存入数据库,标记未读,当用户刷新页面,或者跳转的时候,会拉取下未读消息,并显示在导航栏那里。
第二种比较前卫:用nodejs+socket.io+redis(sub/pub)做的,优点是不用刷新页面用户就可以直接收到提醒,而且引入scoket.io以后还能做很多事情,个人打算采用第二种。
但是现在纠结在消息的持久化方面,如果在web层做消息入库,那么我发现socket.io除了在用户不刷新页面的时候“弹一下”,真没什么用了。反正消息已经存进去了。
要么就是web只管往redis里面publish一个消息,由node这里统一入库,并通知。我第一次用nodejs,对性能没什么信心,主要是nodejs对mysql的操作方面。
想听听大家的意见和看法,谢谢!!
补充:刚才被一位qq好友一顿批评:
这你还用问吗?如果你采用第二种方案,当然是在nodejs做持久化了,你引入redis的pub/sub干什么用的啊!?不就是为了分布式吗?能从不同的地方发消息吗?难道我发一条消息还要先在你的数据库保存一下,再做redis的publish吗?如果是那样,你的nodejs可以扔了。。。。
还是想听听大家的意见。。。。。谢谢。。。。。
socket.io 其实就是每秒访问一次服务器,看有没有消息,效率不是很高
@土豆2015
其实不太同意以下的看法,不分布式也是需要持久化的
nodejs 的持久化就是依赖 redis 或者 mongodb 引入就是为了,如果只是单纯存在nodejs中的话,node进程停了消息丢了,去哪找回来
还用问吗?如果你采用第二种方案,当然是在nodejs做持久化了,你引入redis的pub/sub干什么用的啊!?不就是为了分布式吗?
@土豆2015 你目前采用的是轮询还是?我目前也在做类似你这种需求, 不过我的需求是 客户端app ,另外咨询下你, 你设置的那个消息, 如果用户读消息之后怎么办? (因为消息要持久化)
使用第一种,客户端轮询的方式进行接收下发的消息;
使用redis做计数器和消息队列,mc用来存储消息内容;
comet + redis
redis的数据落地
性能高,速度快
第一种明显更具有挑战性 高性能 高可用等特点了 何不尝试?
对于第二种方案经过了2天的实践,最终放弃了,还是选择了第一种简单的。首先说明:第二种方案是可行的
我的系统对nodejs定位只是用到websocket这块,不提供http服务(ngxin提供),所以没用到任何框架express,sails什么的,再说我喜欢从最基本的代码来了解一个新东西,好了,先看下package.json:
"dependencies": { "log4js": "0.6.21", "cookie": "0.1.2", "redis": "0.12.1", "mysql": "2.5.3", "generic-pool": "2.1.1", "async": "0.9.0", "string-template": "0.2.0", "socket.io": "0.9.17" }
string-template是用来输出消息的,别的大家应该很熟悉了。先看下app.js
global.CHANNEL = 'test.socket' //redis的订阅频道 var Test = require('./test'); //自定义的一个module var test = new Test(); var sub = redis.createClient(6379, '127.0.0.1', {}); //负责订阅的一个redis链接 sub.subscribe(CHANNEL) //订阅频道 sub.on('message', function(channel, message) { //收到频道消息的事件通知,消息格式json var json = null try { json = JSON.parse(message) test.emit(io, json) //处理消息 } catch (e) { LOG.error('exception in emit message: ' + e) } }) //mysql连接池 global.MYSQL_POOL = poolModule.Pool({ name: 'mysql', create: function(callback) { var connection = mysql.createConnection({ host: 'localhost', user: 'root', password: '', charset: 'utf8_general_ci', database: 'test' }); callback(null, connection); }, destroy: function(connection) { connection.end(); }, max: 5, min: 2, idleTimeoutMillis: 30000, log: false }); var io = require('socket.io').listen(3000, { 'log level': 1 }); //设置socket验证机制 io.configure(function() { io.set('authorization', function(handshake, accept) { test.authorize(handshake, accept) }); }); //链接进来的事件 io.sockets.on('connection', function(socket) { test.connect(socket) }) /* shutdown event for pm2 */ process.on('SIGTERM', function() { sub.quit(); LOG.info('destroied subscribe redis') MYSQL_POOL.destroyAllNow(); LOG.info('destroied mysql pool') process.exit(0) }); /* shutdown event for Ctrl+C */ process.on('SIGINT', function() { sub.quit(); LOG.info('destroied subscribe redis') MYSQL_POOL.destroyAllNow(); LOG.info('destroied mysql pool') process.exit(0) });
test.js 的authorize方法,判断链接是否可信,我用的cookie做的验证
this.authorize = function(handshake, accept) { var cookies = cookie.parse(handshake.headers.cookie) //检查cookie if(ok) { handshake.userId = userId //把userId放到握手信息里面,后面的connect才可以拿到 } .... //允许accept(null, true)-200,拒绝accept(null, false)-403,服务端错误accept(err, false)--500 };
test.js 的connect方法,用户通过验证后的链接逻辑:
this.connect = function(socket) { //用户通过验证后链接进来的事件 socket.join('user'); //默认加入所有用户频道 socket.join('user:' + socket.handshake.userId); //默认加入该用户的专用频道 async.waterfall([ //用到了async库,配合连接池,避免callback地狱 function(callback) { MYSQL_POOL.acquire(function(err, client) { callback(err, client) }); }, function(client, callback) { //取出该用户数据库中所有的订阅频道 client.query('select room from room where userid = ?', [socket.handshake.userId], function(err, rooms) { MYSQL_POOL.release(client)//回收链接 callback(err, rooms) }); }, function(rooms, callback) { rooms.forEach(function(room) { //join用户频道 LOG.debug('user ' + socket.handshake.userId + ' joined room ' + room.room) socket.join(room.room) }); callback(null, null) } ], function(err, result) { if (err) { LOG.error(err) } }) };
到这里用户可以正常拦截,链接,并join自己订阅的room了。下面就是怎么从redis的channel里面取到消息通知客户端了,我这里简答的做了一个新用户注册后的消息通知:
this.emit = function(io, json) { LOG.debug(json) var event = json.event if (event == 'user.register') {//用户注册消息 sayWelcome(io, json) } else { throw new Error('undifined event: ' + event) } };
sayWelcome私有方法:
var WELCOME = '欢迎你的加入 SF 社区!<a href="#">{0}</a>' //一个新用户在http服务端注册成功后,直接往redis的test.socket频道里面publish一个 //{"userId":"4ab1h89hkk58", "nickname": "土豆2015", "event": "user.register"} //userId是在http端生成的 sayWelcome = function(io, json) { var userId = json.userId if (userId == null) { throw new Error('user id is null') } var nickname = json.nickname if (nickname == null) { throw new Error('user nickname is null') } async.waterfall([ function(callback) { MYSQL_POOL.acquire(function(err, client) { callback(err, client) }); }, function(client, callback) { var msg = format(WELCOME, [nickname]) //格式化欢迎消息 //往消息库里面插入一个用户消息 client.query('insert into message(userid, msg) values(?, ?)', [userId, msg], function(err, result) { MYSQL_POOL.release(client) callback(err) }); }, function(callback) { //向该用户room发送一个消息 io.sockets.in('user:' + userId).emit('message', ''); callback(null, null) } ], function(err, result) { if (err) { LOG.error(err) } }) }
客户端,实时通知:
socket.on('message', function (msg) { var count = parseInt($('#message').text()); count = count + 1//在原有未读消息数目上+1 $('#message').text(count) });
用户在导航栏点消息的时候,进入消息页面。
还有好多用法,向所有用户发实时一条消息:
伪代码:
[REDIS] publish test.channel {"event":"ALL", "msg":"全体通知"} sayToAll : function(io, json) { //消息入库json.msg io.sockets.in('user').emit('message', ''); }
当帖子有了新回复时,向关注帖子的用户发一条消息:
[REDIS] publish test.channel {"event":"topic.notification", "topicId":"hy6123kd"} var TOPIC_REPLY = '您关注的帖子<a href={0}>{1}</a>有了新回复' sayToTopic : function(io, json) { var topicId = json.topicId var topicTitle = //根据id取到title var msg = format(TOPIC_REPLY,['topic/'+ topicId, 'topicTitle']) //消息入库msg io.sockets.in('topic:' + topicId).emit('message', ''); }
好了,到了这里,好像都很ok,消息系统从原来的http移植到了nodejs,但是我发现了一个很严重的问题:
引入redis的pub/sub就是为了分布式设计的,现在我在另一个端口启动一个同样的nodejs,2个nodejs同时监听 test.socket频道。那么,同样的一个消息会被写入数据库2次,N个nodejs,那么同样的消息会被写入N次。因为大家都在监听同样的频道。
这个问题也能解决,在redis端先生成一个新消息的id,然后用redis的setnx命令去锁这个id,只可能被一个nodejs线程锁到,抢到的进行消息入库,没锁到的只负责通知。
但是这么一来,如此简单的一个消息系统被设计的这么复杂。。。。。。。。
我个人对设计一个理解:
需求本身有一定复杂程度,为了实现这个需求我们要引入另外一些复杂度,当被引入的复杂程度大于需求本身的复杂程度时,那么这个设计是失败的。
于是打算回归第一种简单的设计,也许我对nodejs的这个使用场景根本就是错误的,欢迎大家一起聊聊。