基于UDP的P2P聊天工具 0.3——消息队列和重传
简介:
1)这是一个Windows的P2P聊天工具;
2)相比0.2,它多了定时重传的机制;
3)对局域网来说有些鸡肋,就当是为跨局域网做准备吧;
相关内容:
1)多线程环境下的队列(简);
2)定时消息队列;
3)重传情况下的发送和接收;
一、示意图和说明
这个Hailer是对0.2版中的Talker的继承和发展。从示意图,我们可以看到它大体包括发送和接收这两个功能。这与Talker是一致的。区别在于Hailer的Send/Recv方法不再与socket直接关联,而是与各自的消息队列进行交互。当主程序调用Hailer::Send,数据被打包进一个消息结构,加入消息队列,然后重传线程负责从消息队列中取出消息,调用Talker::Send发送给对端。另一方面,Hailer::Recv的处理过程大体类似。
二、多线程环境下的队列
我将STL的容器做一个封装。不过,尽管我叫它队列,但因为我需要“按序插入”的功能,所以实际上使用的是list。比如说,根据超时时间,将消息结构插入消息队列的情况。
这里考虑两种边界情况:
1)元素数量太大,占用内存超出可用空间上限;
2)队列元素数量为空。
针对第一种情况,我们用“假定法”进行处理:假定可用空间无上限。这样一来,确实可以省一点麻烦~。这么处理,除了可以省事之外,从需求角度来说,正常人打字聊天不会快到来不及处理,而且UDP协议也会将来不及处理的东西丢掉。
针对第二种情况,采用生产者-消费者模式。对于消费者,当队列为空时,陷入等待;对于生产者,在给空队列添加元素时,唤醒等待中的消费者,可以像这样:
void MsgList::Pop()
{
unique_lock lck(m_mutex);
m_cv.wait(lck, [this](){return !m_list.empty(); });
m_list.pop_front();
return pHead;
}
void MsgList::Push_back(T pMsg)
{
m_mutex.lock()
m_list.push_back(pMsg)
if (m_list.size() == 1)
{
m_cv.notify_one()
}
m_mutex.unlock()
}
从这两段代码可以看出,这里的做法就是直接给整个list使用互斥锁,然后对于list为空的情况,使用条件锁进行处理。
三、定时消息队列
关于自定义应用层消息结构,我在0.2版的附加内容里面写了点。
稍稍跑一下题。正如当时在0.2版末尾提的:照理应该再考虑字节序方面的问题。这是因为不同的操作系统,可能会有不同的字节序模式,大端或者小端的。网络传输一般都考虑用大端模式。不过因为这还只是试做版,所以暂时不考虑这个。
判断大小端的方法,可以像这样:
int x = 1;
char* py = (char*)&x;
printf("%d\n", (int)*py);
现在回到正题。我的消息结构像下面这样:
#pragma pack(push,1)
struct Msg
{
unsigned int seq;
unsigned int time;
unsigned int rto;
unsigned int size;
bool ACK;
char data[0];
};
#pragma pack(pop)
大体的用法,大体跟0.2版的附加内容说的差不多。
关于定时队列,其实我考虑过两个处理方法。1)用定时器列表,给每个消息结构整一个定时器。定时器的一种实现方法是,开一个线程,然后睡一定时间。2)用一个循环运行的线程,每隔一段时间检查一下队列中最有可能超时的消息。
我用了第二种,因为这样更简洁。不过需要强调的是,我这里对重传的定时精度并没有需求。具体代码可以像这样:
while (true)
{
this_thread::sleep_for(chrono::milliseconds(200));
while (m_sendList.CheckTime(time(NULL)))
{
shared_ptr<Msg> pMsg = m_sendList.Pop();
Talker::Send((char*)pMsg.get(), sizeof(Msg)+pMsg->size);
pMsg->time = time(NULL);
pMsg->rto = getRTO();
m_sendList.InsertByOrder(pMsg);
}
}
四、重传情况下的发送和接收
这里为了降低错误的可能性,使用shared_ptr作为消息队列的元素。
可以看到发送时,将字符串打包到消息结构,放到消息队列。
void Hailer::Send(const char* buf, int len)
{
Msg * pMsg = (Msg*)new char[sizeof(Msg)+len];
memset(pMsg, 0, sizeof(Msg)+len);
pMsg->time = time(NULL);
pMsg->size = len;
memmove(pMsg->data, buf, len);
shared_ptr shared_pMsg(pMsg);
m_sendList.Push_front(shared_pMsg);
}
接收时,从消息队列取出消息结构。
int Hailer::Recv(char* buf, int len)
{
shared_ptr pMsg = m_recvList.Pop();
if (pMsg->size <len)
len = pMsg->size;
memmove(buf, pMsg->data, len);
return len;
}
重传线程则在上面的定时队列的内容中可以看到。
至于接收线程,则可以像下面:
void Hailer::thread_recv()
{
while (true)
{
char buf[1024] = { 0 };
int size = Talker::Recv(buf, sizeof(buf));
if (size == -10054)
{
printf("%s\n", "peer is not online.");
continue;
}
else if (size <= 0)
{
continue;
}
Msg* pMsg = (Msg*)new char[size];
memmove(pMsg, buf, size);
shared_ptr shared_pMsg(pMsg);
if (shared_pMsg->time + shared_pMsg->rto
五、说明
这篇估计会看得有点不通畅。这大概是因为在写完第一段后,它在草稿箱躺了蛮久。不过,其实开头的那幅图大概也够表达意思了。
而对于我来说:
1)好消息是,0.3的代码是可以跑的,也确实搞定了之前偶然出现的丢包情况;
2)坏消息是,我关闭线程的方式并不优雅,只是粗暴的关闭进程。这会产生不少负面效果,比如资源的和平释放等问题。这个问题就留待0.4版考虑吧。