热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于UDP的P2P聊天工具0.3——消息队列和重传

基于UDP的P2P聊天工具0.3——消息队列和重传简介:1)这是一个Windows的P2P聊天工具;2)相比0.2,它多了定时重传的机制;3)对局域网来说有些鸡肋,就当是

基于UDP的P2P聊天工具 0.3——消息队列和重传


简介:
1)这是一个Windows的P2P聊天工具;
2)相比0.2,它多了定时重传的机制;
3)对局域网来说有些鸡肋,就当是为跨局域网做准备吧;


相关内容:
1)多线程环境下的队列(简);
2)定时消息队列;
3)重传情况下的发送和接收;


一、示意图和说明
Hailer示意图


这个Hailer是对0.2版中的Talker的继承和发展。从示意图,我们可以看到它大体包括发送和接收这两个功能。这与Talker是一致的。区别在于Hailer的Send/Recv方法不再与socket直接关联,而是与各自的消息队列进行交互。当主程序调用Hailer::Send,数据被打包进一个消息结构,加入消息队列,然后重传线程负责从消息队列中取出消息,调用Talker::Send发送给对端。另一方面,Hailer::Recv的处理过程大体类似。


二、多线程环境下的队列
我将STL的容器做一个封装。不过,尽管我叫它队列,但因为我需要“按序插入”的功能,所以实际上使用的是list。比如说,根据超时时间,将消息结构插入消息队列的情况。

这里考虑两种边界情况:
1)元素数量太大,占用内存超出可用空间上限;
2)队列元素数量为空。


针对第一种情况,我们用“假定法”进行处理:假定可用空间无上限。这样一来,确实可以省一点麻烦~。这么处理,除了可以省事之外,从需求角度来说,正常人打字聊天不会快到来不及处理,而且UDP协议也会将来不及处理的东西丢掉。


针对第二种情况,采用生产者-消费者模式。对于消费者,当队列为空时,陷入等待;对于生产者,在给空队列添加元素时,唤醒等待中的消费者,可以像这样:

void MsgList::Pop()
{
    unique_lock lck(m_mutex);
    m_cv.wait(lck, [this](){return !m_list.empty(); });
    m_list.pop_front(); 
    return pHead;
}
void MsgList::Push_back(T pMsg)
{
    m_mutex.lock();
    m_list.push_back(pMsg);
    if (m_list.size() == 1)
    {
        m_cv.notify_one();
    }
    m_mutex.unlock();
}


从这两段代码可以看出,这里的做法就是直接给整个list使用互斥锁,然后对于list为空的情况,使用条件锁进行处理。


三、定时消息队列
关于自定义应用层消息结构,我在0.2版的附加内容里面写了点。

稍稍跑一下题。正如当时在0.2版末尾提的:照理应该再考虑字节序方面的问题。这是因为不同的操作系统,可能会有不同的字节序模式,大端或者小端的。网络传输一般都考虑用大端模式。不过因为这还只是试做版,所以暂时不考虑这个。

判断大小端的方法,可以像这样:

    int x = 1;
    char* py = (char*)&x;
    printf("%d\n", (int)*py);
    // 如果是1,那么说明是小端;


现在回到正题。我的消息结构像下面这样:

#pragma pack(push,1)
struct Msg
{
    unsigned int seq;
    unsigned int time;  // 发送时的时间戳
    unsigned int rto;   // 重传超时
    unsigned int size;
    bool ACK;
    char data[0];
};
#pragma pack(pop)


大体的用法,大体跟0.2版的附加内容说的差不多。

关于定时队列,其实我考虑过两个处理方法。1)用定时器列表,给每个消息结构整一个定时器。定时器的一种实现方法是,开一个线程,然后睡一定时间。2)用一个循环运行的线程,每隔一段时间检查一下队列中最有可能超时的消息。

我用了第二种,因为这样更简洁。不过需要强调的是,我这里对重传的定时精度并没有需求。具体代码可以像这样:

while (true)
{       
    this_thread::sleep_for(chrono::milliseconds(200));
    while (m_sendList.CheckTime(time(NULL)))
    {               
        shared_ptr<Msg> pMsg = m_sendList.Pop();            
        Talker::Send((char*)pMsg.get(), sizeof(Msg)+pMsg->size);
        pMsg->time = time(NULL);
        pMsg->rto = getRTO();               // 暂时定为2秒
        m_sendList.InsertByOrder(pMsg);                 
    }
}


四、重传情况下的发送和接收
这里为了降低错误的可能性,使用shared_ptr作为消息队列的元素。
可以看到发送时,将字符串打包到消息结构,放到消息队列。

void Hailer::Send(const char* buf, int len)
{
    Msg * pMsg = (Msg*)new char[sizeof(Msg)+len];
    memset(pMsg, 0, sizeof(Msg)+len);   
    pMsg->time = time(NULL);    
    pMsg->size = len;
    memmove(pMsg->data, buf, len);

    shared_ptr shared_pMsg(pMsg);
    m_sendList.Push_front(shared_pMsg); 
}


接收时,从消息队列取出消息结构。

int Hailer::Recv(char* buf, int len)
{
    shared_ptr pMsg = m_recvList.Pop();
    if (pMsg->size <len)
        len = pMsg->size;
    memmove(buf, pMsg->data, len);
    return len;
}

重传线程则在上面的定时队列的内容中可以看到。
至于接收线程,则可以像下面:

void Hailer::thread_recv()
{
    while (true)
    {       
        char buf[1024] = { 0 };
        int size = Talker::Recv(buf, sizeof(buf)); 
        if (size == -10054)
        {
            printf("%s\n", "peer is not online.");
            continue;
        }
        else if (size <= 0)
        {
            continue;
        }

        Msg* pMsg = (Msg*)new char[size];
        memmove(pMsg, buf, size);

        shared_ptr shared_pMsg(pMsg);      
        if (shared_pMsg->time + shared_pMsg->rto // 若已超时,则忽略 
        {
            continue;
        }
        else if (shared_pMsg->ACK)
        {
            //printf("收到回包,发送成功\n");
            m_sendList.Erase(shared_pMsg->seq);         
        }
        else
        {   
            m_recvList.Push_back(shared_pMsg);

            // 发送ACK 
            shared_ptr ackMsg(new Msg);
            ackMsg->seq = shared_pMsg->seq;
            ackMsg->time = shared_pMsg->time;
            ackMsg->rto = shared_pMsg->rto;
            ackMsg->size = 0;
            ackMsg->ACK = true;
            Talker::Send((const char*)(ackMsg.get()), sizeof(Msg));
        }
    }
}


五、说明
这篇估计会看得有点不通畅。这大概是因为在写完第一段后,它在草稿箱躺了蛮久。不过,其实开头的那幅图大概也够表达意思了。
而对于我来说:
1)好消息是,0.3的代码是可以跑的,也确实搞定了之前偶然出现的丢包情况;
2)坏消息是,我关闭线程的方式并不优雅,只是粗暴的关闭进程。这会产生不少负面效果,比如资源的和平释放等问题。这个问题就留待0.4版考虑吧。


推荐阅读
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • 006_Redis的List数据类型
    1.List类型是一个链表结构的集合,主要功能有push,pop,获取元素等。List类型是一个双端链表的结构,我们可以通过相关操作进行集合的头部或者尾部添加删除元素,List的设 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文介绍了使用哈夫曼树实现文件压缩和解压的方法。首先对数据结构课程设计中的代码进行了分析,包括使用时间调用、常量定义和统计文件中各个字符时相关的结构体。然后讨论了哈夫曼树的实现原理和算法。最后介绍了文件压缩和解压的具体步骤,包括字符统计、构建哈夫曼树、生成编码表、编码和解码过程。通过实例演示了文件压缩和解压的效果。本文的内容对于理解哈夫曼树的实现原理和应用具有一定的参考价值。 ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • Java编程实现邻接矩阵表示稠密图的方法及实现类介绍
    本文介绍了Java编程如何实现邻接矩阵表示稠密图的方法,通过一个名为AMWGraph.java的类来构造邻接矩阵表示的图,并提供了插入结点、插入边、获取邻接结点等功能。通过使用二维数组来表示结点之间的关系,并通过元素的值来表示权值的大小,实现了稠密图的表示和操作。对于对稠密图的表示和操作感兴趣的读者可以参考本文。 ... [详细]
  • 本文介绍了在go语言中利用(*interface{})(nil)传递参数类型的原理及应用。通过分析Martini框架中的injector类型的声明,解释了values映射表的作用以及parent Injector的含义。同时,讨论了该技术在实际开发中的应用场景。 ... [详细]
author-avatar
xao
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有