1.epoll的事件模型

EPOLL事件有两种模型:

Edge Triggered (ET) 边缘触发:==只有数据到来才触发,不管缓存区中是否还有数据==。ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

Level Triggered (LT) 水平触发:==只要缓存区有数据都会触发==。LT是缺省的工作方式,并且同时支持blockno-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

1.1 ET(边沿模式)的设置

边沿模式不是默认的 epoll 模式,需要额外进行设置。epoll 设置边沿模式是非常简单的,epoll 管理的红黑树示例中每个节点都是 struct epoll_event 类型,只需要将 EPOLLET 添加到结构体的 events 成员中即可:

1
2
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 设置边沿模式

1.2 基于管道epoll ET触发模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/*************************************************************************
# > File Name:server.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 23 Oct 2022 11:19:43 AM CST
************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>

#define MAXLINE 10

int main(int argc, char *argv[])
{
int efd, i;
int pfd[2];
pid_t pid;
char buf[MAXLINE], ch = 'a';

pipe(pfd);
pid = fork();
if (pid == 0) {
close(pfd[0]);
while (1) {
for (i = 0; i < MAXLINE/2; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

write(pfd[1], buf, sizeof(buf));
sleep(2);
}
close(pfd[1]);
} else if (pid > 0) {
struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
close(pfd[1]);

efd = epoll_create(10);
/* event.events = EPOLLIN; */
event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
event.data.fd = pfd[0];
epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);

while (1) {
res = epoll_wait(efd, resevent, 10, -1);
printf("res %d\n", res);
if (resevent[0].data.fd == pfd[0]) {
len = read(pfd[0], buf, MAXLINE/2);
write(STDOUT_FILENO, buf, len);
}
}
close(pfd[0]);
close(efd);
} else {
perror("fork");
exit(-1);
}
return 0;
}

1.3 基于网络C/S模型的epoll ET触发模式

server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/*************************************************************************
# > File Name:epoll_et.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 23 Oct 2022 11:21:24 AM CST
************************************************************************/

/* server.c */
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(void)
{
struct sockaddr_in servaddr, cliaddr;
socklen_t cliaddr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int i, efd;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

listen(listenfd, 20);

struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
efd = epoll_create(10);
// event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
event.events = EPOLLIN;
printf("Accepting connections ...\n");
cliaddr_len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));

event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);

while (1) {
res = epoll_wait(efd, resevent, 10, -1);
printf("res %d\n", res);
if (resevent[0].data.fd == connfd) {
len = read(connfd, buf, MAXLINE/2);
write(STDOUT_FILENO, buf, len);
}
}
return 0;
}

client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*************************************************************************
# > File Name:client.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 23 Oct 2022 11:22:14 AM CST
************************************************************************/

/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "wrap.h"

#define MAXLINE 10
#define SERV_PORT 8080

int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, i;
char ch = 'a';

sockfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (1) {
for (i = 0; i < MAXLINE/2; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

write(sockfd, buf, sizeof(buf));
sleep(10);
}
Close(sockfd);
return 0;
}

server边沿触发,编译运行,如下所示

02.png

运行后,每过5秒钟服务器才输出一组字符,这是就是边沿触发的效果。


更改服务器为水平触发模式,运行程序,如下:

01.png

运行后,每5秒输出两组字符串,这是因为只写入了两组,这个模式的服务器,缓冲区有多少读多少。

1.4 基于网络C/S非阻塞模型的epoll ET触发模式

1.4.1 设置非阻塞

在使用epoll ET触发模式进行读事件的检测时,有新数据达到只会通知一次,那么必须要保证得到通知后将数据全部从读缓冲区中读出。那么,应该如何读这些数据呢?

我们可以循环读取数据,如下所示:

1
2
3
4
5
int len = 0;
while((len = recv(curfd, buf, sizeof(buf), 0)) > 0)
{
// 数据处理...
}

但这样做还有一个问题,因为套接字操作默认是阻塞的,当读缓冲区数据被读完之后,读操作就阻塞了也就是调用的 read()/recv() 函数被阻塞了,如果是单线程/进程程序的话,程序就不能往下执行了。

要解决阻塞问题,就需要将套接字默认的阻塞行为修改为非阻塞,需要使用fcntl()函数进行处理:

1
2
3
4
// 设置完成之后, 读写都变成了非阻塞模式
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);

通过上述分析就可以得出一个结论:epoll 在边沿模式下,必须要将套接字设置为非阻塞模式,但是,这样就会引发另外的一个 bug,在非阻塞模式下,循环地将读缓冲区数据读到本地内存中,当缓冲区数据被读完了,调用的 read()/recv() 函数还会继续从缓冲区中读数据,此时函数调用就失败了,返回 - 1,对应的全局变量 errno 值为 EAGAIN 或者 EWOULDBLOCK 如果打印错误信息会得到如下的信息:Resource temporarily unavailable

演示代码:

server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*************************************************************************
# > File Name:epoll_et_npblock.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 23 Oct 2022 03:10:42 PM CST
************************************************************************/

/* server.c */
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(void)
{
struct sockaddr_in servaddr, cliaddr;
socklen_t cliaddr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int i, efd, flag;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

listen(listenfd, 20);

struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
efd = epoll_create(10);
/* event.events = EPOLLIN; */
event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */

printf("Accepting connections ...\n");
cliaddr_len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));

flag = fcntl(connfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);

while (1) {
printf("epoll_wait begin\n");
res = epoll_wait(efd, resevent, 10, -1);
printf("epoll_wait end res %d\n", res);

if (resevent[0].data.fd == connfd) {
while ((len = read(connfd, buf, MAXLINE/2)) > 0)
write(STDOUT_FILENO, buf, len);
}
}
return 0;
}

client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*************************************************************************
# > File Name:client_noblock.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 23 Oct 2022 03:13:29 PM CST
************************************************************************/

/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "wrap.h"

#define MAXLINE 10
#define SERV_PORT 8080

int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, i;
char ch = 'a';

sockfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (1) {
for (i = 0; i < MAXLINE/2; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

write(sockfd, buf, sizeof(buf));
sleep(10);
}
Close(sockfd);
return 0;
}

1.5 基于多线程的边沿非阻塞处理

直接上代码吧:

server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/*************************************************************************
# > File Name:server.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Tue 08 Nov 2022 02:07:34 PM CST
************************************************************************/

#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>

typedef struct socketinfo
{
int fd; //要操作的文件描述符
int epfd; //红黑树实例
} SocketInfo;

void *acceptConn(void *arg)
{
printf("acception tid: %ld\n", pthread_self());
SocketInfo *info = (SocketInfo *)arg;
// 建立新的连接
int cfd = accept(info->fd, NULL, NULL);
// 将文件描述符设置为非阻塞
// 得到文件描述符的属性
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
// 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
// 通信的文件描述符检测读缓冲区数据的时候设置为边沿模式
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 读缓冲区是否有数据
ev.data.fd = cfd;
int ret = epoll_ctl(info->epfd, EPOLL_CTL_ADD, cfd, &ev);
if (ret == -1)
{
perror("epoll_ctl-accept");
exit(0);
}
free(info);
return NULL;
}

void *communication(void *arg)
{
printf("acception tid: %ld\n", pthread_self());
SocketInfo *info = (SocketInfo *)arg;
int curfd = info->fd;
int epfd = info->epfd;
// 处理通信的文件描述符
// 接收数据
char buf[5];
char temp[1024];
memset(buf, 0, sizeof(buf));
bzero(temp, sizeof(temp));
// 循环读数据
while (1)
{
int len = recv(curfd, buf, sizeof(buf), 0);
if (len == 0)
{
// 非阻塞模式下和阻塞模式是一样的 => 判断对方是否断开连接
printf("客户端断开了连接...\n");
// 将这个文件描述符从epoll模型中删除
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
close(curfd);
break;
}
else if (len > 0)
{
// 通信
// 接收的数据打印到终端
for (int i = 0; i < len; i++)
{
buf[i] = toupper(buf[i]);
}
strncat(temp + strlen(temp), buf, len);
write(STDOUT_FILENO, buf, len);
// 发送数据
// send(curfd, buf, len, 0);
}
else
{
// len == -1
if (errno == EAGAIN)
{
printf("数据读完了...\n");
//发送数据
send(curfd, temp, strlen(temp) + 1, 0);
break;
}
else
{
perror("recv error");
break;
// exit(0); //不能exit因为会结束整个程序
}
}
}
free(info);
return NULL;
}

// server
int main(int argc, const char *argv[])
{
// 创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if (lfd == -1)
{
perror("socket error");
exit(1);
}

// 绑定
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(9527);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 本地多有的IP
// 127.0.0.1
// inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr.s_addr);

// 设置端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 绑定端口
int ret = bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret == -1)
{
perror("bind error");
exit(1);
}

// 监听
ret = listen(lfd, 64);
if (ret == -1)
{
perror("listen error");
exit(1);
}
printf("已完成初始化\n");
// 现在只有监听的文件描述符
// 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的epoll
// 创建一个epoll模型
int epfd = epoll_create(100);
if (epfd == -1)
{
perror("epoll_create");
exit(0);
}

// 往epoll实例中添加需要检测的节点, 现在只有监听的文件描述符
struct epoll_event ev;
ev.events = EPOLLIN; // 检测lfd读读缓冲区是否有数据
ev.data.fd = lfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
if (ret == -1)
{
perror("epoll_ctl");
exit(0);
}

struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(struct epoll_event);
// 持续检测
while (1)
{
// 调用一次, 检测一次
int num = epoll_wait(epfd, evs, size, -1);
printf("==== num: %d\n", num);
pthread_t tid;
for (int i = 0; i < num; ++i)
{
// 取出当前的文件描述符
int curfd = evs[i].data.fd;
SocketInfo *info = (SocketInfo *)malloc(sizeof(SocketInfo));
info->fd = curfd;
info->epfd = epfd;
// 判断这个文件描述符是不是用于监听的
if (curfd == lfd)
{
pthread_create(&tid, NULL, acceptConn, (void *)info);
pthread_detach(tid);
}
else
{
pthread_create(&tid, NULL, communication, (void *)info);
pthread_detach(tid);
}

}

}
return 0;
}

client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*************************************************************************
# > File Name:client.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Tue 08 Nov 2022 03:10:51 PM CST
************************************************************************/

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define MAXLINE 80
#define SERV_PORT 9527
int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;

sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0)
{
perror("create failed");
exit(1);
}

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "124.221.165.184", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

int i = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
if (i < 0)
{
perror("connect failed");
exit(1);
}
int num = 0;
printf("服务器连接成功\n");
while (1)
{
sprintf(buf, "hello, world, %d\n...", num++);
printf("%s\n", buf);
write(sockfd, buf, strlen(buf) + 1);
recv(sockfd, buf, sizeof(buf), 0);
printf("recv msg:%s\n", buf);
usleep(10000);
}
recv(sockfd, buf, sizeof(buf), 0);
printf("recv msg:%s\n", buf);
printf("over-----------\n");
close(sockfd);
return 0;
}

编译运行,结果如下:

服务器端:

03.png

线程1:

04.png

线程2:

05.png