​ 多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,==不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件==。通过这种方式可以同时监测多个文件描述符并且这个过程是阻塞的,一旦检测到有文件描述符就绪( 可以读数据或者可以写数据)程序的阻塞就会被解除,之后就可以基于这些(一个或多个)就绪的文件描述符进行通信了。通过这种方式在单线程 / 进程的场景下也可以在服务器端实现并发。常见的 IO 多路转接方式有:selectpollepoll

​ 与多进程和多线程技术相比,I/O 多路复用技术的最大优势是系统开销小,系统不必创建进程 / 线程,也不必维护这些进程 / 线程,从而大大减小了系统的开销。

1.select

  1. select能监听的文件描述符个数受限于FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数
  2. 解决1024以下客户端时使用select是很合适的,但如果链接客户端过多,select采用的是轮询模型,会大大降低服务器响应效率,不应在select上投入更多精力

1.1 函数原型

1
2
3
4
5
6
7
8
#include <sys/select.h>
struct timeval {
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval * timeout);

函数参数:

  • nfds: 监听的所有文件描述符中,最大文件描述符+1
  • readfds: 读 文件描述符监听集合,传入传出参数
  • writefds: 写 文件描述符监听集合,传入传出参数
  • exceptfds: 异常 文件描述符监听集合,传入传出参数
  • timeout: 定时阻塞监控时间,3种情况
    • NULL,永远等下去,阻塞监听
    • 设置timeval,等待固定时间
    • 设置timeval里时间均为0,检查描述字后立即返回,轮询

函数返回值:

  • 大于 0:成功,返回集合中已就绪的文件描述符的总个数
  • 等于 - 1:函数调用失败
  • 等于 0:超时,没有检测到就绪的文件描述符

另外初始化 fd_set 类型的参数还需要使用相关的一些列操作函数,具体如下:

1
2
3
4
5
6
7
8
// 将文件描述符fd从set集合中删除 == 将fd对应的标志位设置为0        
void FD_CLR(int fd, fd_set *set);
// 判断文件描述符fd是否在set集合中 == 读一下fd对应的标志位到底是0还是1
int FD_ISSET(int fd, fd_set *set);
// 将文件描述符fd添加到set集合中 == 将fd对应的标志位设置为1
void FD_SET(int fd, fd_set *set);
// 将set集合中, 所有文件文件描述符对应的标志位设置为0, 集合中没有添加任何文件描述符
void FD_ZERO(fd_set *set);

演示:

1
2
3
4
5
6
7
8
9
10
void FD_ZERO(fd_set *set);	--- 清空一个文件描述符集合。
fd_set rset;
FD_ZERO(&rset);
void FD_SET(int fd, fd_set *set); --- 将待监听的文件描述符,添加到监听集合中
FD_SET(3, &rset); FD_SET(5, &rset); FD_SET(6, &rset);
void FD_CLR(int fd, fd_set *set); --- 将一个文件描述符从监听集合中移除。
FD_CLR(4, &rset);
int FD_ISSET(int fd, fd_set *set); --- 判断一个文件描述符是否在监听集合中。
返回值: 在:1;不在:0
FD_ISSET(4, &rset);

select优缺点

  • 缺点:
    • 监听上限受文件描述符限制。 最大 1024.
    • 检测满足条件的fd, 自己添加业务逻辑提高小。 提高了编码难度。
  • 优点: 跨平台。win、linux、macOS、Unix、类Unix、mips

1.2 select实现多路IO转接设计思路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int maxfd = 0
lfd = socket() ; 创建套接字
maxfd = lfd;
bind(); 绑定地址结构
listen(); 设置监听上限
fd_set rset, allset; 创建r监听集合
FD_ZERO(&allset); 将r监听集合清空
FD_SET(lfd, &allset); 将 lfd 添加至读集合中。
while1) {
rset = allset; 保存监听集合
ret = select(lfd+1, &rset, NULLNULLNULL); 监听文件描述符集合对应事件。
if(ret > 0) { 有监听的描述符满足对应事件
if (FD_ISSET(lfd, &rset)) { // 1 在。 0不在。
cfd = accept(); 建立连接,返回用于通信的文件描述符
maxfd = cfd;
FD_SET(cfd, &allset); 添加到监听通信描述符集合中。
}
for (i = lfd+1; i <= 最大文件描述符; i++){
FD_ISSET(i, &rset) 有read、write事件
read()
小 -- 大
write();
}
}
}

1.3 server

Socket、Bind、Listen等函数,我们都对其进行了出错处理封装,点我查看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*************************************************************************
# > File Name:server.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Thu 06 Oct 2022 10:12:32 PM CST
************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ctype.h>
#include "wrap.h"

#define SERV_PORT 9527

int main(int argc, char *argv[])
{
int listenfd, connfd;
char buf[BUFSIZ];

struct sockaddr_in clie_addr, serv_addr;
socklen_t clie_addr_len;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT);
Bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
Listen(listenfd, 128);

fd_set rset, allset; //定义读集合,备份集合
int ret, maxfd = 0, n, i, j;
maxfd = listenfd; //最大文件描述符

FD_ZERO(&allset); //清空监听集合
FD_SET(listenfd, &allset); //将待监听fd添加到监听集合中

while (1)
{
rset = allset; //备份
ret = select(maxfd+1, &rset, NULL, NULL, NULL); //使用select监听
if (ret < 0)
{
perr_exit("select error");
}
if (FD_ISSET(listenfd, &rset)) //listenfd满足监听的读事件
{
clie_addr_len = sizeof(clie_addr);
connfd = Accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len); //建立连接 -----不会阻塞
FD_SET(connfd, &allset); //将新产生的fd,添加到监听集合中,监听数据读事件

if(maxfd < connfd) //修改maxfd
{
maxfd = connfd;
}

if(ret == 1) //说明select只返回一个,并且是listenfd,后续执行无需执行
{
continue;
}
}
for(i = listenfd + 1; i <= maxfd; i++){ //处理满足读事件的fd
if(FD_ISSET(i, &allset)) //找到满足读事件的那个fd
{
n = Read(i, buf, sizeof(buf));
if(n == 0) //检测到客户端已经关闭连接
{
Close(i);
FD_CLR(i, &allset); //将关闭的fd,移除出监听集合
}else if(n == -1){
perr_exit("read error");
}

for(j =0; j< n; j++)
{
buf[j] = toupper(buf[j]);
}
write(i, buf, n);
write(STDOUT_FILENO, buf, n);

}
}
}
Close(listenfd);
return 0;
}

1.4 client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/*************************************************************************
# > File Name:client.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 25 Sep 2022 06:40:40 PM CST
************************************************************************/

/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"
#define MAXLINE 80
#define SERV_PORT 9527
int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "10.0.12.16", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (fgets(buf, MAXLINE, stdin) != NULL) {
Write(sockfd, buf, strlen(buf));
n = Read(sockfd, buf, MAXLINE);
if (n == 0)
printf("the other side has been closed.\n");
else
Write(STDOUT_FILENO, buf, n);
}
Close(sockfd);
return 0;
}

编译运行,结果如下:

01.png

02.png

1.5 代码优化

03.png

如果最大fd是1023,每次确定有事件发生的fd时,就要扫描3-1023的所有文件描述符,这看起来很蠢。于是定义一个数组,把要监听的文件描述符存下来,每次扫描这个数组就行了。看起来科学得多,这样就不需要每次扫描一大堆无关文件描述符了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*************************************************************************
# > File Name:server_improve.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Thu 13 Oct 2022 09:20:47 PM CST
************************************************************************/

/* server_improve.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <ctype.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 6666

int main(int argc, char *argv[])
{
int i, maxi, maxfd, listenfd, connfd, sockfd;
int nready, client[FD_SETSIZE]; /* FD_SETSIZE 默认为 1024 */
ssize_t n;
fd_set rset, allset;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN]; /* #define INET_ADDRSTRLEN 16 */
socklen_t cliaddr_len;
struct sockaddr_in cliaddr, servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

Listen(listenfd, 20); /* 默认最大128 */

maxfd = listenfd; /* 初始化 */
maxi = -1; /* client[]的下标 */

for (i = 0; i < FD_SETSIZE; i++)
client[i] = -1; /* 用-1初始化client[] */

FD_ZERO(&allset);
FD_SET(listenfd, &allset); /* 构造select监控文件描述符集 */

for ( ; ; ) {
rset = allset; /* 每次循环时都从新设置select监控信号集 */
nready = select(maxfd+1, &rset, NULL, NULL, NULL);

if (nready < 0)
perr_exit("select error");
if (FD_ISSET(listenfd, &rset)) { /* new client connection */
cliaddr_len = sizeof(cliaddr);
connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));
for (i = 0; i < FD_SETSIZE; i++) {
if (client[i] < 0) {
client[i] = connfd; /* 保存accept返回的文件描述符到client[]里 */
break;
}
}
/* 达到select能监控的文件个数上限 1024 */
if (i == FD_SETSIZE) {
fputs("too many clients\n", stderr);
exit(1);
}

FD_SET(connfd, &allset); /* 添加一个新的文件描述符到监控信号集里 */
if (connfd > maxfd)
maxfd = connfd; /* select第一个参数需要 */
if (i > maxi)
maxi = i; /* 更新client[]最大下标值 */

if (--nready == 0)
continue; /* 如果没有更多的就绪文件描述符继续回到上面select阻塞监听,
负责处理未处理完的就绪文件描述符 */
}
for (i = 0; i <= maxi; i++) { /* 检测哪个clients 有数据就绪 */
if ( (sockfd = client[i]) < 0)
continue;
if (FD_ISSET(sockfd, &rset)) {
if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
Close(sockfd); /* 当client关闭链接时,服务器端也关闭对应链接 */
FD_CLR(sockfd, &allset); /* 解除select监控此文件描述符 */
client[i] = -1;
} else {
int j;
for (j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Write(sockfd, buf, n);
}
if (--nready == 0)
break;
}
}
}
close(listenfd);
return 0;
}

2.poll

poll是对select的改进,但是它是个半成品,相对select提升不大。最终版本是epoll,所以poll了解一下就完事儿,重点掌握epoll。

2.1 函数原型

1
2
3
4
5
6
7
8
9
10
#include <poll.h>
// 每个委托poll检测的fd都对应这样一个结构体
struct pollfd {
int fd; /* 委托内核检测的文件描述符 */
short events; /* 委托内核检测文件描述符的什么事件 */
short revents; /* 文件描述符实际发生的事件 -> 传出 */
};

struct pollfd myfd[100];
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

函数参数:

  • fds: 这是一个 struct pollfd 类型的数组,里边存储了待检测的文件描述符的信息,这个数组中有三个成员:

    • fd:委托内核检测的文件描述符
    • events:委托内核检测的 fd 事件(输入、输出、错误),每一个事件有多个取值
      • 取值:POLLINPOLLOUTPOLLERR
    • revents:这是一个传出参数,数据由内核写入,存储内核检测之后的结果
      • 可能的值:POLLINPOLLOUTPOLLERR
  • nfds: 监听数组的,实际有效监听个数。

  • timeout:

    • 大于0: 阻塞指定的毫秒(ms)数之后,解除阻塞
    • -1: 一直阻塞,直到检测的集合中有就绪的文件描述符(有事件产生)解除阻塞
    • 0:不阻塞,不管检测集合中有没有已就绪的文件描述符,函数马上返回
  • 返回值:返回满足对应监听事件的文件描述符 总个数。

    • 失败: 返回 - 1
    • 成功:返回一个大于 0 的整数,表示检测的集合中已就绪的文件描述符的总个数

poll优缺点

  • 优点:

  • 自带数组结构。 可以将监听事件集合和返回事件集合分离。

  • 拓展监听上限。 超出1024限制。

  • 缺点:

  • 不能跨平台。 仅限Linux

  • 无法直接定位满足监听事件的文件描述符, 编码难度较大。

04.png

2.2 server

Socket、Bind、Listen等函数,我们都对其进行了出错处理封装,点我查看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*************************************************************************
# > File Name:server.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Thu 20 Oct 2022 07:22:11 PM CST
************************************************************************/

/* server.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <errno.h>
#include <ctype.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 9527
#define OPEN_MAX 1024

int main(int argc, char *argv[])
{
int i, j, maxi, listenfd, connfd, sockfd;
int nready;
ssize_t n;
char buf[MAXLINE], str[INET_ADDRSTRLEN];
socklen_t clilen;
struct pollfd client[OPEN_MAX];
struct sockaddr_in cliaddr, servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

Listen(listenfd, 20);

client[0].fd = listenfd;
client[0].events = POLLRDNORM; /* listenfd监听普通读事件 */

for (i = 1; i < OPEN_MAX; i++)
client[i].fd = -1; /* 用-1初始化client[]里剩下元素 */
maxi = 0; /* client[]数组有效元素中最大元素下标 */

for ( ; ; ) {
nready = poll(client, maxi+1, -1); /* 阻塞 */
if (client[0].revents & POLLRDNORM) { /* 有客户端链接请求 */
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));
for (i = 1; i < OPEN_MAX; i++) {
if (client[i].fd < 0) {
client[i].fd = connfd; /* 找到client[]中空闲的位置,存放accept返回的connfd */
break;
}
}

if (i == OPEN_MAX)
perr_exit("too many clients");

client[i].events = POLLRDNORM; /* 设置刚刚返回的connfd,监控读事件 */
if (i > maxi)
maxi = i; /* 更新client[]中最大元素下标 */
if (--nready <= 0)
continue; /* 没有更多就绪事件时,继续回到poll阻塞 */
}
for (i = 1; i <= maxi; i++) { /* 检测client[] */
if ((sockfd = client[i].fd) < 0)
continue;
if (client[i].revents & (POLLRDNORM | POLLERR)) {
if ((n = Read(sockfd, buf, MAXLINE)) < 0) {
if (errno == ECONNRESET) { /* 当收到 RST标志时 */
/* connection reset by client */
printf("client[%d] aborted connection\n", i);
Close(sockfd);
client[i].fd = -1;
} else {
perr_exit("read error");
}
} else if (n == 0) {
/* connection closed by client */
printf("client[%d] closed connection\n", i);
Close(sockfd);
client[i].fd = -1;
} else {
for (j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Writen(sockfd, buf, n);
}
if (--nready <= 0)
break; /* no more readable descriptors */
}
}
}
return 0;
}

2.3 client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/*************************************************************************
# > File Name:client.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 25 Sep 2022 06:40:40 PM CST
************************************************************************/

/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"
#define MAXLINE 80
#define SERV_PORT 9527
int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "10.0.12.16", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (fgets(buf, MAXLINE, stdin) != NULL) {
Write(sockfd, buf, strlen(buf));
n = Read(sockfd, buf, MAXLINE);
if (n == 0)
printf("the other side has been closed.\n");
else
Write(STDOUT_FILENO, buf, n);
}
Close(sockfd);
return 0;
}

编译运行,结果如下:

05.png

06.png

3.epoll

​ epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

​ 目前epell是linux大规模并发网络程序中的热门首选模型。

​ epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

可以使用cat命令查看一个进程可以打开的socket描述符上限。

1
cat /proc/sys/fs/file-max

如有需要,可以通过修改配置文件的方式修改该上限值。

1
2
3
4
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。
* soft nofile 65536
* hard nofile 100000

07.png

3.1 操作函数

在 epoll 中一共提供是三个 API 函数,分别处理不同的操作,函数原型如下:

1
2
3
4
5
6
7
#include <sys/epoll.h>
// 创建epoll实例,通过一棵红黑树管理待检测集合
int epoll_create(int size);
// 管理红黑树上的文件描述符(添加、修改、删除)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 检测epoll树中是否有就绪的文件描述符
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

3.1.1 epoll_create()

epoll_create() 函数的作用是创建一个红黑树模型的实例,用于管理待检测的文件描述符的集合。

1
2
#include <sys/epoll.h>
int epoll_create(int size) size:监听数目
  • 函数参数size:用来告诉内核监听的文件描述符的个数,跟内存大小有关。

  • 函数返回值:

    • 失败:返回 - 1
    • 成功:返回一个有效的文件描述符,通过这个文件描述符就可以访问创建的 epoll 实例了

3.1.2 epoll_ctl()

epoll_ctl() 函数的作用是管理红黑树实例上的节点,可以进行添加、删除、修改操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <sys/epoll.h>
// 联合体, 多个变量共用同一块内存
typedef union epoll_data {
void *ptr;
int fd; // 通常情况下使用这个成员, 和epoll_ctl的第三个参数相同即可
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

函数参数:

  • epfdepoll_create () 函数的返回值,通过这个参数找到 epoll 实例
  • op:这是一个枚举值,控制通过该函数执行什么操作
    • EPOLL_CTL_ADD:往 epoll 模型中添加新的节点
    • EPOLL_CTL_MOD:修改 epoll 模型中已经存在的节点
    • EPOLL_CTL_DEL:删除 epoll 模型中的指定的节点
  • fd:文件描述符,即要添加 / 修改 / 删除的文件描述符
  • event:epoll 事件,用来修饰第三个参数对应的文件描述符的,指定检测这个文件描述符的什么事件
  • events:委托 epoll 检测的事件
    • EPOLLIN:读事件,接收数据,检测读缓冲区,如果有数据该文件描述符就绪
    • EPOLLOUT:写事件,发送数据,检测写缓冲区,如果可写该文件描述符就绪
    • EPOLLERR:异常事件
  • data:用户数据变量,这是一个联合体类型,通常情况下使用里边的 fd 成员,用于存储待检测的文件描述符的值,在调用 epoll_wait() 函数的时候这个值会被传出。

函数返回值:

  • 失败:返回 - 1
  • 成功:返回 0

3.1.3 epoll_wait()

epoll_wait() 函数的作用是检测创建的epoll实例中有没有就绪的文件描述符。

1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

函数参数:

  • epfdepoll_create () 函数的返回值,通过这个参数找到 epoll 实例

  • events: 传出参数,这是一个结构体数组的地址,里边存储了已就绪的文件描述符的信息

  • maxevents: 告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,

  • timeout: 是超时时间

    • -1:函数一直阻塞,直到 epoll 实例中有已就绪的文件描述符之后才解除阻塞
    • 0: 函数不阻塞,不管 epoll 实例中有没有就绪的文件描述符,函数被调用后都直接返回
    • >0:如果 epoll 实例中没有已就绪的文件描述符,函数阻塞对应的毫秒数再返回

函数返回值:

  • 成功:
    • 等于 0:函数是阻塞被强制解除了,没有检测到满足条件的文件描述符
    • 大于 0:检测到的已就绪的文件描述符的总个数
  • 失败:返回 - 1

3.2 epoll实现多路IO转接思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
lfd = socket();			监听连接事件lfd
bind();
listen();
int epfd = epoll_create(1024); epfd, 监听红黑树的树根。
struct epoll_event tep, ep[1024]; tep, 用来设置单个fd属性, ep 是 epoll_wait() 传出的满足监听事件的数组。
tep.events = EPOLLIN; 初始化 lfd的监听属性。
tep.data.fd = lfd
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &tep); 将 lfd 添加到监听红黑树上。
while (1) {
ret = epoll_wait(epfd, ep,1024-1); 实施监听
for (i = 0; i < ret; i++) {
if (ep[i].data.fd == lfd) { // lfd 满足读事件,有新的客户端发起连接请求
cfd = Accept();
tep.events = EPOLLIN; 初始化 cfd的监听属性。
tep.data.fd = cfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &tep);
} else { cfd 们 满足读事件, 有客户端写数据来。
n = read(ep[i].data.fd, buf, sizeof(buf));
if ( n == 0) {
close(ep[i].data.fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, ep[i].data.fd , NULL); // 将关闭的cfd,从监听树上摘下。
} else if (n > 0) {
小--大
write(ep[i].data.fd, buf, n);
}
}
}

3.3 server

Socket、Bind、Listen等函数,我们都对其进行了出错处理封装,点我查看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*************************************************************************
# > File Name:server.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Thu 20 Oct 2022 05:28:24 PM CST
************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <ctype.h>
#include <unistd.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 9527
#define OPEN_MAX 1024

int main(int argc, char *argv[])
{
int i, j, maxi, listenfd, connfd, sockfd;
int nready, efd, res;
ssize_t n;
char buf[MAXLINE], str[INET_ADDRSTRLEN];
socklen_t clilen;
int client[OPEN_MAX];
struct sockaddr_in cliaddr, servaddr;
struct epoll_event tep, ep[OPEN_MAX];

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

Bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));

Listen(listenfd, 20);

for (i = 0; i < OPEN_MAX; i++)
client[i] = -1;
maxi = -1;

efd = epoll_create(OPEN_MAX);
if (efd == -1)
perr_exit("epoll_create");

tep.events = EPOLLIN; tep.data.fd = listenfd;

res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep);
if (res == -1)
perr_exit("epoll_ctl");

while (1) {
nready = epoll_wait(efd, ep, OPEN_MAX, -1); /* 阻塞监听 */
if (nready == -1)
perr_exit("epoll_wait");

for (i = 0; i < nready; i++) {
if (!(ep[i].events & EPOLLIN))
continue;
if (ep[i].data.fd == listenfd) {
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));
for (j = 0; j < OPEN_MAX; j++) {
if (client[j] < 0) {
client[j] = connfd; /* save descriptor */
break;
}
}

if (j == OPEN_MAX)
perr_exit("too many clients");
if (j > maxi)
maxi = j; /* max index in client[] array */

tep.events = EPOLLIN;
tep.data.fd = connfd;
res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep);
if (res == -1)
perr_exit("epoll_ctl");
} else {
sockfd = ep[i].data.fd;
n = Read(sockfd, buf, MAXLINE);
if (n == 0) {
for (j = 0; j <= maxi; j++) {
if (client[j] == sockfd) {
client[j] = -1;
break;
}
}
res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
if (res == -1)
perr_exit("epoll_ctl");

Close(sockfd);
printf("client[%d] closed connection\n", j);
} else {
for (j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Writen(sockfd, buf, n);
}
}
}
}
close(listenfd);
close(efd);
return 0;
}

3.4 client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/*************************************************************************
# > File Name:client.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 25 Sep 2022 06:40:40 PM CST
************************************************************************/

/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "wrap.h"
#define MAXLINE 80
#define SERV_PORT 9527
int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "10.0.12.16", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (fgets(buf, MAXLINE, stdin) != NULL) {
Write(sockfd, buf, strlen(buf));
n = Read(sockfd, buf, MAXLINE);
if (n == 0)
printf("the other side has been closed.\n");
else
Write(STDOUT_FILENO, buf, n);
}
Close(sockfd);
return 0;
}

编译运行,结果如下:

08.png

09.png