1.epoll 反应堆模型:

==实际上就是使用epoll的ET模式 + 非阻塞、轮询 + void *ptr(设置回调函数)。==

接下来我们回顾一下之前用epoll实现的并发服务器

先使用 socket()、bind()、listen()创建套接字并初始化 –> epoll_create() 创建监听红黑树 –> 返回 epfd –> epoll_ctl() 向树上添加1个监听fd –> while(1)循环–> epoll_wait() 监听 –> 对应监听fd有事件产生 –> 返回监听满足数组。 –> 判断返回数组元素 –> 若lfd满足 –> 则调用Accept() –>若 cfd 满足 –>则调用 read() —> 小写转大写 –> write回去。

接下来再看看用epoll反应堆实现的并发服务器工作流程,实际上不同的是,在监听cfd过程中,不仅仅要监听 cfd 的读事件、还要监听cfd的写事件,并且分别对它们读写事件设置各自的回调函数。当各自对应的事件满足时,直接调用满足该事件的回调函数。

先使用socket()、bind()、listen()创建套接字并初始化 –> epoll_create() 创建监听红黑树 –> 返回 epfd –> epoll_ctl() 向树上添加一个监听fd –> while(1)循环 –> epoll_wait() 监听 –> 对应监听fd有事件产生 –> 返回监听满足数组。 –> 判断返回数组元素 –>若 lfd满足 –> Accept() –>若 cfd 满足 –> read() —> 小写转大写 –> cfd从监听红黑树上摘下 –> 设置EPOLLOUT监听写事件,设置回调函数 –> 调用epoll_ctl() 设置EPOLL_CTL_ADD 重新放到红黑上监听写事件 –> 等待 epoll_wait 返回 –> 说明 cfd 可写 –> write回去–> cfd从监听红黑树上摘下 –>设置 EPOLLIN监听读事件,设置回调函数 –> epoll_ctl()设置EPOLL_CTL_ADD 重新放到红黑上监听读事件 –> epoll_wait() 监听

反应堆的理解:加入IO转接之后,有了事件,server才去处理,这里反应堆也是这样,由于网络环境复杂,服务器处理数据之后,可能并不能直接写回去,比如遇到网络繁忙或者对方缓冲区已经满了这种情况,就不能直接写回给客户端。反应堆就是在处理数据之后,监听写事件,能写回给客户端了,才去做写回操作。写回之后,再改为监听读事件。如此循环。

==大家可以结合这个视频p284集进行理解==

点我查看

2.epoll反应堆源代码实现

接下来直接看到代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*************************************************************************
# > File Name:server.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Sun 23 Oct 2022 05:36:01 PM CST
************************************************************************/
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>

#define MAX_EVENTS 1024 //监听上限数
#define BUFLEN 4096
#define SERV_PORT 8080

void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);

/* 描述就绪文件描述符相关信息 */

struct myevent_s {
int fd; //要监听的文件描述符
int events; //对应的监听事件
void *arg; //泛型参数
void (*call_back)(int fd, int events, void *arg); //回调函数
int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听)
char buf[BUFLEN];
int len;
long last_active; //记录每次加入红黑树 g_efd 的时间值
};

int g_efd; //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组. +1-->listen fd


/*将结构体 myevent_s 成员变量 初始化*/

void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
memset(ev->buf, 0, sizeof(ev->buf));
ev->len = 0;
ev->last_active = time(NULL); //调用eventset函数的时间

return;
}

/* 向 epoll监听的红黑树 添加一个 文件描述符 */

//eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
void eventadd(int efd, int events, struct myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUT

if (ev->status == 0) { //已经在红黑树 g_efd 里
op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1
ev->status = 1;
}

if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
else
printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);

return ;
}

/* 从epoll 监听的 红黑树中删除一个 文件描述符*/

void eventdel(int efd, struct myevent_s *ev)
{
struct epoll_event epv = {0, {0}};

if (ev->status != 1) //不在红黑树上
return ;

//epv.data.ptr = ev;
epv.data.ptr = NULL;
ev->status = 0; //修改状态
epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将 ev->fd 摘除

return ;
}

/* 当有文件描述符就绪, epoll返回, 调用该函数 与客户端建立链接 */

void acceptconn(int lfd, int events, void *arg)
{
struct sockaddr_in cin;
socklen_t len = sizeof(cin);
int cfd, i;

if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {
if (errno != EAGAIN && errno != EINTR) {
/* 暂时不做出错处理 */
}
printf("%s: accept, %s\n", __func__, strerror(errno));
return ;
}

do {
for (i = 0; i < MAX_EVENTS; i++) //从全局数组g_events中找一个空闲元素
if (g_events[i].status == 0) //类似于select中找值为-1的元素
break; //跳出 for

if (i == MAX_EVENTS) {
printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);
break; //跳出do while(0) 不执行后续代码
}

int flag = 0;
if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //将cfd也设置为非阻塞
printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));
break;
}

/* 给cfd设置一个 myevent_s 结构体, 回调函数 设置为 recvdata */
eventset(&g_events[i], cfd, recvdata, &g_events[i]);
eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件

} while(0);

printf("new connect [%s:%d][time:%ld], pos[%d]\n",
inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
return ;
}

void recvdata(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s *)arg;
int len;

len = recv(fd, ev->buf, sizeof(ev->buf), 0); //读文件描述符, 数据存入myevent_s成员buf中

eventdel(g_efd, ev); //将该节点从红黑树上摘除

if (len > 0) {

ev->len = len;
ev->buf[len] = '\0'; //手动添加字符串结束标记
printf("C[%d]:%s\n", fd, ev->buf);

eventset(ev, fd, senddata, ev); //设置该 fd 对应的回调函数为 senddata
eventadd(g_efd, EPOLLOUT, ev); //将fd加入红黑树g_efd中,监听其写事件

} else if (len == 0) {
close(ev->fd);
/* ev-g_events 地址相减得到偏移元素位置 */
printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);
} else {
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}

return;
}

void senddata(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s *)arg;
int len;

len = send(fd, ev->buf, ev->len, 0); //直接将数据 回写给客户端。未作处理

eventdel(g_efd, ev); //从红黑树g_efd中移除

if (len > 0) {

printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
eventset(ev, fd, recvdata, ev); //将该fd的 回调函数改为 recvdata
eventadd(g_efd, EPOLLIN, ev); //从新添加到红黑树上, 设为监听读事件

} else {
close(ev->fd); //关闭链接
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}

return ;
}

/*创建 socket, 初始化lfd */

void initlistensocket(int efd, short port)
{
struct sockaddr_in sin;

int lfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(lfd, F_SETFL, O_NONBLOCK); //将socket设为非阻塞

memset(&sin, 0, sizeof(sin)); //bzero(&sin, sizeof(sin))
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);

bind(lfd, (struct sockaddr *)&sin, sizeof(sin));

listen(lfd, 20);

/* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */
eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);

/* void eventadd(int efd, int events, struct myevent_s *ev) */
eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);

return ;
}

int main(int argc, char *argv[])
{
unsigned short port = SERV_PORT;

if (argc == 2)
port = atoi(argv[1]); //使用用户指定端口.如未指定,用默认端口

g_efd = epoll_create(MAX_EVENTS+1); //创建红黑树,返回给全局 g_efd
if (g_efd <= 0)
printf("create efd in %s err %s\n", __func__, strerror(errno));

initlistensocket(g_efd, port); //初始化监听socket

struct epoll_event events[MAX_EVENTS+1]; //保存已经满足就绪事件的文件描述符数组
printf("server running:port[%d]\n", port);

int checkpos = 0, i;
while (1) {
/* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */

long now = time(NULL); //当前时间
for (i = 0; i < 100; i++, checkpos++) { //一次循环检测100个。 使用checkpos控制检测对象
if (checkpos == MAX_EVENTS)
checkpos = 0;
if (g_events[checkpos].status != 1) //不在红黑树 g_efd 上
continue;

long duration = now - g_events[checkpos].last_active; //客户端不活跃的世间

if (duration >= 60) {
close(g_events[checkpos].fd); //关闭与该客户端链接
printf("[fd=%d] timeout\n", g_events[checkpos].fd);
eventdel(g_efd, &g_events[checkpos]); //将该客户端 从红黑树 g_efd移除
}
}

/*监听红黑树g_efd, 将满足的事件的文件描述符加至events数组中, 1秒没有事件满足, 返回 0*/
int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);
if (nfd < 0) {
printf("epoll_wait error, exit\n");
break;
}

for (i = 0; i < nfd; i++) {
/*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/
struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;

if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //读就绪事件
ev->call_back(ev->fd, events[i].events, ev->arg);
//lfd EPOLLIN
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //写就绪事件
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}

/* 退出前释放所有资源 */
return 0;
}