IOCP中已完成的I/O信息将注册到完成端口对象(Completion Port,简称CP对象)。但这个过程并非简单的注册,需要如下两个工作:
1 创建完成端口对象
2 建立完成端口对象和套接字之间的联系
此时的套接字必须被赋予重叠属性。(异步I/O)
前三个参数是无意义的,只有最后一个参数有意义:表示分配给CP对象的处理I/O的线程数,如果为0,表示分配CPU核数个线程数。
成功返回CP对象句柄,失败返回NULL
参数:套接字句柄、CP对象句柄、传递已完成I/O相关信息、CP对象非null时自动忽略此参数
参数:CP对象句柄、传输数据大小变量地址、CreateIoCompletoinPort()第三个参数、WSASend/Recv()的倒数第二个参数、超时信息。
通过GetQueuedCompletionStatus()函数的第三个参数得到的是以连接套接字和CP对象为目的而调用的CreateIoCompletoinPort()函数的第三个参数值
通过GetQueuedCompletionStatus()函数的第四个参数得到的是调用WSASend/Recv()函数时传入的WSAOVERLAPPED结构体变量地址值
**并发模型的优点:**一个线程等待一个客户请求,并创建一个新的线程来处理请求。当新线程正在处理客户请求时,原来的线程会进入下一次循环并等待另一个客户请求。处理客户请求的线程完成处理过程后,会终止。
**并发模型的问题:**等待请求的线程只有很少的工作需要做,大多数事件它处于睡眠状态,当客户请求到来时,能对客户的请求进行快捷的处理。每个客户请求都有自己的线程,服务器具备非常好的伸缩性,能发挥多处理器的优势。CPU硬件升级时服务器性能会相应的提高。
线程过多时,所有的线程都处于可运行状态,内核在各个线程之间进行切换花费的时间太多。为了解决这个问题,IOCP的做法是并发运行的线程的数量必须有一个上限。
可运行线程的数量是多少合适?
答案是处理器核数个线程是最理想的。
需要为每个客户的请求创建一个新的线程。线程创建销毁开销过大。
IOCP使用线程池解决这个问题。
IOCP模型的内部结构:
保存与该CP端口关联的设备,此处为套接字
先入先出,当设备的一个异步I/O请求完成时,系统检查是否与一个CP端口关联,如果设备与CP端口关联,系统将该项已完成的I/O请求追加到CP端口的I/O完成队列的末尾。
先进后出
已释放/忙碌线程列表:
已暂停线程列表:
被释放的线程调用任何函数使得该线程切换到了等待状态,CP端口会检测到这一情况,并将该线程标识符从已释放线程队列移除,添加到已暂停线程列表中。
等待线程队列为什么要先进后出?
如果I/O请求完成的足够慢,使得一个线程能把它们全部处理完,那么系统会不断唤醒同一个线程,使其他线程继续睡眠。通过使用后入先出的算法,系统可以将那些未被调度的线程的内存资源换出到磁盘,并将它们从处理器的高速缓存中清除。
线程个数?
当创建CP端口时,需要指定运行多少个线程并发。通常这个值设为主机CPU核的个数。
线程池中应该有多少线程?
经验法则是:CPU核的个数乘以2
如果CPU有两个核,线程池应该有4个线程。则,如果有4个I/O请求已完成,有4个线程在等待,CP端口只会唤醒两个线程,而让其他两个线程继续睡眠。
当每个线程处理完一个已完成的I/O项时,会再次进入等待线程队列,系统发现I/O完成队列还有其他的项,于是会唤醒同一个线程对剩余的项进行处理。
为什么要让更多的线程在线程池中等待呢?
当CP端口唤醒一个线程时,会将该线程标识放入已释放线程列表中。如果一个已释放线程调用的任何函数将该线程切换到了等待状态,那么CP端口会检测到这一情况,回将该线程的标识从已释放线程列表移除,并添加到已暂停线程列表中。
CP端口的目标是将尽可能多的线程保持在已释放线程列表中(但必须小于指定的并发线程数),如果一个线程进入等待状态,则已释放线程列表会减少,CP端口将释放另一个正在等待的线程。
如果暂停的线程被唤醒,那么它会离开已暂停线程列表,重新进入已释放线程列表。此时已释放线程列表会在短时间内大于最大允许的并发线程数。一旦线程进入下一次循环,并进入等待线程列表,可运行线程的数量就会迅速下降。
以上解释了为什么线程池中的线程数应该大于在完成端口中设置的并发线程数量。
IOCP模型回声服务器:
#include "stdafx.h"
#include
#include
#include
#include
#include
#include
#pragma comment(lib,"pthreadVC2.lib")
using namespace::std;
#define BUF_SIZE 102
#define READ 3
#define WRITE 5
typedef struct
{
SOCKET hClntSock;
SOCKADDR_IN clntAdr;
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA; // 套接字信息
typedef struct
{
OVERLAPPED overlapped; //重叠对象
WSABUF wsaBuf;
char buffer[BUF_SIZE];
int rwMode; //读写标志
}PER_IO_DATA, *LPPER_IO_DATA; //IO信息
void* EchoThreadMain(void*); //线程函数
void ErrorHanding(char * message);
void main()
{
WSADATA wsaData;
HANDLE hComPort;
SYSTEM_INFO sysInfo;
LPPER_IO_DATA ioInfo;
LPPER_HANDLE_DATA handleInfo;// 套接字信息
SOCKET hServSock;
SOCKADDR_IN servAdr;
DWORD recvBytes;
int i;
DWORD flags = 0;
vector pthread_t_vec;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
ErrorHanding("socket start error!");
//创建CP对象
hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//最大线程数
//创建线程,传入CP对象
GetSystemInfo(&sysInfo);
for (auto i = 0; i 2
; ++i)客户端代码不变:
#include "stdafx.h"
#include
#include
#include
using namespace::std;
#define BUF_SIZE 1024
void ErrorHanding(char * message);
void main()
{
//加载套接字库
WSADATA wsaData;//保存套接字信息
SOCKET hSocket;//套接字句柄
char message[BUF_SIZE];
int strLen;
int readLen;
SOCKADDR_IN servAdr;//保存服务器的地址信息
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)//加载库并获取库信息填至wsaData
ErrorHanding("socket start error!");
//创建套接字
hSocket = socket(PF_INET, SOCK_STREAM, 0);//流式套接字,IPV4:TCP
if (hSocket == INVALID_SOCKET)
ErrorHanding("socket create error!");
//设置地址端口
memset(&servAdr, 0, sizeof(servAdr));
servAdr.sin_family = AF_INET;//地址族
servAdr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");//本地回路IP地址 字节顺序转换
servAdr.sin_port = htons(6000);//与服务器端口一致 字节顺序转换
//向服务器发送连接请求
if (connect(hSocket, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
ErrorHanding("socket connect error!");
else
puts("connected...........");
while (1)
{
fputs("Input message(Q to quit):", stdout);
fgets(message, BUF_SIZE, stdin);
if (!strcmp(message, "q\n") || !strcmp(message, "Q\n"))
break;
strLen = strlen(message);
send(hSocket, message, strlen(message), 0);
readLen= 0;
//考虑到TCP传输特性,重复调用recv函数
while (readLen {
readLen += recv(hSocket, message, BUF_SIZE - 1, 0);
}
message[strLen] = 0;
printf("Message from server: %s ", message);
}
closesocket(hSocket);
WSACleanup();
return;
}
void ErrorHanding(char * message)
{
cout < exit(1);
}