node.js - web消息系统设计-持久化

 lk神密勇士 发布于 2022-11-05 19:26

大家好,

第一次发帖,不要拍砖啊,哈哈。

目前正在设计消息系统这块,已经有了2种方案,都是比较不错的,但是现在纠结在消息持久化这块。

第一种比较传统:收到消息后直接存入数据库,标记未读,当用户刷新页面,或者跳转的时候,会拉取下未读消息,并显示在导航栏那里。

第二种比较前卫:用nodejs+socket.io+redis(sub/pub)做的,优点是不用刷新页面用户就可以直接收到提醒,而且引入scoket.io以后还能做很多事情,个人打算采用第二种。

但是现在纠结在消息的持久化方面,如果在web层做消息入库,那么我发现socket.io除了在用户不刷新页面的时候“弹一下”,真没什么用了。反正消息已经存进去了。

要么就是web只管往redis里面publish一个消息,由node这里统一入库,并通知。我第一次用nodejs,对性能没什么信心,主要是nodejs对mysql的操作方面。

想听听大家的意见和看法,谢谢!!


补充:刚才被一位qq好友一顿批评:

这你还用问吗?如果你采用第二种方案,当然是在nodejs做持久化了,你引入redis的pub/sub干什么用的啊!?不就是为了分布式吗?能从不同的地方发消息吗?难道我发一条消息还要先在你的数据库保存一下,再做redis的publish吗?如果是那样,你的nodejs可以扔了。。。。

还是想听听大家的意见。。。。。谢谢。。。。。

7 个回答
  • socket.io 其实就是每秒访问一次服务器,看有没有消息,效率不是很高

    2022-11-10 05:25 回答
  • @土豆2015
    其实不太同意以下的看法,不分布式也是需要持久化的
    nodejs 的持久化就是依赖 redis 或者 mongodb 引入就是为了,如果只是单纯存在nodejs中的话,node进程停了消息丢了,去哪找回来

    还用问吗?如果你采用第二种方案,当然是在nodejs做持久化了,你引入redis的pub/sub干什么用的啊!?不就是为了分布式吗?

    2022-11-10 05:29 回答
  • @土豆2015 你目前采用的是轮询还是?我目前也在做类似你这种需求, 不过我的需求是 客户端app ,另外咨询下你, 你设置的那个消息, 如果用户读消息之后怎么办? (因为消息要持久化)

    2022-11-10 05:30 回答
  • 使用第一种,客户端轮询的方式进行接收下发的消息;
    使用redis做计数器和消息队列,mc用来存储消息内容;

    2022-11-10 05:30 回答
  • comet + redis
    redis的数据落地

    性能高,速度快

    2022-11-10 05:37 回答
  • 第一种明显更具有挑战性 高性能 高可用等特点了 何不尝试?

    2022-11-10 05:48 回答
  • 对于第二种方案经过了2天的实践,最终放弃了,还是选择了第一种简单的。首先说明:第二种方案是可行的


    开始分享代码吧

    我的系统对nodejs定位只是用到websocket这块,不提供http服务(ngxin提供),所以没用到任何框架express,sails什么的,再说我喜欢从最基本的代码来了解一个新东西,好了,先看下package.json:

    "dependencies": {
            "log4js": "0.6.21",
            "cookie": "0.1.2",
            "redis": "0.12.1",
            "mysql": "2.5.3",
            "generic-pool": "2.1.1",
            "async": "0.9.0",
            "string-template": "0.2.0",
            "socket.io": "0.9.17"
        }
    

    string-template是用来输出消息的,别的大家应该很熟悉了。先看下app.js

    global.CHANNEL = 'test.socket' //redis的订阅频道
    var Test = require('./test'); //自定义的一个module
    var test = new Test();
    var sub = redis.createClient(6379, '127.0.0.1', {}); //负责订阅的一个redis链接
    sub.subscribe(CHANNEL) //订阅频道
    sub.on('message', function(channel, message) { //收到频道消息的事件通知,消息格式json
            var json = null
            try {
                json = JSON.parse(message)
                test.emit(io, json) //处理消息
            } catch (e) {
                LOG.error('exception in emit message: ' + e)
            }
        })
        //mysql连接池
    global.MYSQL_POOL = poolModule.Pool({
        name: 'mysql',
        create: function(callback) {
            var connection = mysql.createConnection({
                host: 'localhost',
                user: 'root',
                password: '',
                charset: 'utf8_general_ci',
                database: 'test'
            });
            callback(null, connection);
        },
        destroy: function(connection) {
            connection.end();
        },
        max: 5,
        min: 2,
        idleTimeoutMillis: 30000,
        log: false
    });
    var io = require('socket.io').listen(3000, {
        'log level': 1
    });
    //设置socket验证机制
    io.configure(function() {
        io.set('authorization', function(handshake, accept) {
            test.authorize(handshake, accept)
        });
    });
    //链接进来的事件
    io.sockets.on('connection', function(socket) {
        test.connect(socket)
    })
    
    /* shutdown event for pm2 */
    process.on('SIGTERM', function() {
        sub.quit();
        LOG.info('destroied subscribe redis')
        MYSQL_POOL.destroyAllNow();
        LOG.info('destroied mysql pool')
        process.exit(0)
    });
    
    /* shutdown event for Ctrl+C */
    process.on('SIGINT', function() {
        sub.quit();
        LOG.info('destroied subscribe redis')
        MYSQL_POOL.destroyAllNow();
        LOG.info('destroied mysql pool')
        process.exit(0)
    });
    

    test.js 的authorize方法,判断链接是否可信,我用的cookie做的验证

    this.authorize = function(handshake, accept) {
        var cookies = cookie.parse(handshake.headers.cookie)
        //检查cookie
        if(ok) {
            handshake.userId = userId //把userId放到握手信息里面,后面的connect才可以拿到
        }
        ....
        //允许accept(null, true)-200,拒绝accept(null, false)-403,服务端错误accept(err, false)--500
    };
    

    test.js 的connect方法,用户通过验证后的链接逻辑:

    this.connect = function(socket) { //用户通过验证后链接进来的事件
        socket.join('user'); //默认加入所有用户频道
        socket.join('user:' + socket.handshake.userId); //默认加入该用户的专用频道
        async.waterfall([ //用到了async库,配合连接池,避免callback地狱
            function(callback) {
                MYSQL_POOL.acquire(function(err, client) {
                    callback(err, client)
                });
            },
            function(client, callback) {
                //取出该用户数据库中所有的订阅频道
                client.query('select room from room where userid = ?', [socket.handshake.userId], function(err, rooms) {
                    MYSQL_POOL.release(client)//回收链接
                    callback(err, rooms)
                });
            },
            function(rooms, callback) {
                rooms.forEach(function(room) {
                    //join用户频道
                    LOG.debug('user ' + socket.handshake.userId + ' joined room ' + room.room)
                    socket.join(room.room)
                });
                callback(null, null)
            }
        ], function(err, result) {
            if (err) {
                LOG.error(err)
            }
        })
    };
    

    到这里用户可以正常拦截,链接,并join自己订阅的room了。下面就是怎么从redis的channel里面取到消息通知客户端了,我这里简答的做了一个新用户注册后的消息通知:

    this.emit = function(io, json) {
        LOG.debug(json)
        var event = json.event
        if (event == 'user.register') {//用户注册消息
            sayWelcome(io, json)
        } else {
            throw new Error('undifined event: ' + event)
        }
    };
    

    sayWelcome私有方法:

    var WELCOME = '欢迎你的加入 SF 社区!<a href="#">{0}</a>'
    
    //一个新用户在http服务端注册成功后,直接往redis的test.socket频道里面publish一个
    //{"userId":"4ab1h89hkk58", "nickname": "土豆2015", "event": "user.register"}
    //userId是在http端生成的
    sayWelcome = function(io, json) {
        var userId = json.userId
        if (userId == null) {
            throw new Error('user id is null')
        }
        var nickname = json.nickname
        if (nickname == null) {
            throw new Error('user nickname is null')
        }
        async.waterfall([
            function(callback) {
                MYSQL_POOL.acquire(function(err, client) {
                    callback(err, client)
                });
            },
            function(client, callback) {
                var msg = format(WELCOME, [nickname]) //格式化欢迎消息
                    //往消息库里面插入一个用户消息
                client.query('insert into message(userid, msg) values(?, ?)', [userId, msg], function(err, result) {
                    MYSQL_POOL.release(client)
                    callback(err)
                });
            },
            function(callback) {
            //向该用户room发送一个消息
                io.sockets.in('user:' + userId).emit('message', '');
                callback(null, null)
            }
        ], function(err, result) {
            if (err) {
                LOG.error(err)
            }
        })
    }
    

    客户端,实时通知:

    socket.on('message', function (msg) {
        var count = parseInt($('#message').text());
        count = count + 1//在原有未读消息数目上+1
        $('#message').text(count)
    });
    

    用户在导航栏点消息的时候,进入消息页面。


    还有好多用法,向所有用户发实时一条消息:
    伪代码:

    [REDIS] publish test.channel {"event":"ALL", "msg":"全体通知"}
    
    sayToAll : function(io, json) {
        //消息入库json.msg
        io.sockets.in('user').emit('message', '');
    }
    

    当帖子有了新回复时,向关注帖子的用户发一条消息:

    [REDIS] publish test.channel {"event":"topic.notification", "topicId":"hy6123kd"}
    var TOPIC_REPLY = '您关注的帖子<a href={0}>{1}</a>有了新回复'
    sayToTopic : function(io, json) {
        var topicId = json.topicId
        var topicTitle = //根据id取到title
        var msg = format(TOPIC_REPLY,['topic/'+ topicId, 'topicTitle'])
        //消息入库msg
        io.sockets.in('topic:' + topicId).emit('message', '');
    }
    

    如果不打算做nodejs分布式的可以不用看下面,上面已经可行了。

    好了,到了这里,好像都很ok,消息系统从原来的http移植到了nodejs,但是我发现了一个很严重的问题:

    引入redis的pub/sub就是为了分布式设计的,现在我在另一个端口启动一个同样的nodejs,2个nodejs同时监听 test.socket频道。那么,同样的一个消息会被写入数据库2次,N个nodejs,那么同样的消息会被写入N次。因为大家都在监听同样的频道。

    这个问题也能解决,在redis端先生成一个新消息的id,然后用redis的setnx命令去锁这个id,只可能被一个nodejs线程锁到,抢到的进行消息入库,没锁到的只负责通知。

    但是这么一来,如此简单的一个消息系统被设计的这么复杂。。。。。。。。
    我个人对设计一个理解:

    需求本身有一定复杂程度,为了实现这个需求我们要引入另外一些复杂度,当被引入的复杂程度大于需求本身的复杂程度时,那么这个设计是失败的。

    于是打算回归第一种简单的设计,也许我对nodejs的这个使用场景根本就是错误的,欢迎大家一起聊聊。

    2022-11-10 06:01 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有