Table of Contents
2 涉及的源码
- sapi/fpm/fpm/fpm_events.c
- sapi/fpm/fpm/fpm_events.h
3 基本概念
我们先来看看.h文件
3.1 fpm_event_s
定义了一个event事件,例如 socket事件 或者 定时任务事件
struct fpm_event_s {
int fd; // not set with FPM_EV_TIMEOUT,当为定时器的时候,被设置为-1
struct timeval timeout; // next time to trigger,下次触发时间
struct timeval frequency; //定时器的周期
void (* callback)(struct fpm_event_s * , short, void * ); // event时间的回调函数
void * arg; // 回调函数的参数
int flags; // FPM_EV_READ(socket事件) or FPM_EV_PERSIST(定时器)
int index; // index of the fd in the ufds array
short which; // type of event,当flags拥有FPM_EV_READ标记的时候,取值为FPM_EV_READ,否则为FPM_EV_TIMEOUT,表示定时器
};
3.2 fpm_event_queue_s
用链表表示event事件集合,这里有两个事件集合,两者没有交集
- fpm_event_queue_fd : 表示socket事件的链表集合
- fpm_event_queue_timer : 表示定时器任务事件的链表集合
typedef struct fpm_event_queue_s {
struct fpm_event_queue_s * prev; //指向前一个节点
struct fpm_event_queue_s * next; //指向后一个节点
struct fpm_event_s * ev; //指向一个event事件
} fpm_event_queue;
3.3 fpm_event_module_s
主要是基于各种io复用模型封装了统一的接口,底层使用select or epoll等等模型,对上层是透明的 用户可以根据具体的环境选择合适的io复用模型,主要是给socket事件使用的
struct fpm_event_module_s {
const char * name;
int support_edge_trigger;
int (* init)(int max_fd);
int (* clean)(void);
int (* wait)(struct fpm_event_queue_s * queue, unsigned long int timeout);
int (* add)(struct fpm_event_s * ev);
int (* remove)(struct fpm_event_s * ev);
};
fpm提供了例如 devpoll epoll kqueue poll port select 这几种io复用的模型,代码在 sapi/fpm/fpm/events/ 下
3.4 io复用模型的选择
每个io复用的模型(例如以下的epoll)会提供基于 fpm_event_module_s 结构体的赋值
static struct fpm_event_module_s epoll_module = {
.name = "epoll",
.support_edge_trigger = 1,
.init = fpm_event_epoll_init,
.clean = fpm_event_epoll_clean,
.wait = fpm_event_epoll_wait,
.add = fpm_event_epoll_add,
.remove = fpm_event_epoll_remove,
};
struct fpm_event_module_s * fpm_event_epoll_module()
{
#if HAVE_EPOLL
return &epoll_module;
#else
return NULL;
#endif // HAVE_EPOLL
}
然后在 fpm_event_pre_init 函数里会根据 配置文件里的machanism参数 选择对应的模型
int fpm_event_pre_init(char * machanism)
{
//...........
module = fpm_event_epoll_module();
if (module) {
if (!machanism || strcasecmp(module->name, machanism) == 0) {
return 0;
}
}
//.............
}
fpm_event_pre_init 的调用是在读取配置文件的时候,调用的路径如下:
main() -> fpm_init() -> fpm_conf_init_main() -> fpm_conf_post_process() -> fpm_event_pre_init()
3.5 io复用模型的wait操作
正常io复用的wait操作会返回一系列的fd集合,然后根据这些fd集合进行处理
不过fpm把这两个操作都放在wait函数里了,这个写法感觉有点耦合了
以 epoll 的 wait 函数为例子
static int fpm_event_epoll_wait(struct fpm_event_queue_s * queue, unsigned long int timeout)
{
int ret, i;
// ensure we have a clean epoolfds before calling epoll_wait()
memset(epollfds, 0, sizeof(struct epoll_event) * nepollfds);
// wait for inconming event or timeout
ret = epoll_wait(epollfd, epollfds, nepollfds, timeout);
if (ret == -1) {
// trigger error unless signal interrupt
if (errno != EINTR) {
zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno);
return -1;
}
}
// events have been triggered, let's fire them
for (i = 0; i < ret; i++) {
// do we have a valid ev ptr ?
if (!epollfds[i].data.ptr) {
continue;
}
// fire the event
//static int fpm_event_epoll_add(struct fpm_event_s * ev)
//{
// struct epoll_event e;
//
// e.events = EPOLLIN;
// e.data.fd = ev->fd;
// e.data.ptr = (void * )ev;
// //............................
//}
fpm_event_fire((struct fpm_event_s * )epollfds[i].data.ptr);//执行event的设置的回调函数
//void fpm_event_fire(struct fpm_event_s * ev)
//{
// (* ev->callback)( (struct fpm_event_s * ) ev, ev->which, ev->arg);
//}
}
return ret;
}
4 fpm_event相关操作
fpm event最主要的操作也就是fpm主进程一直在做的循环监听事件,在之前的php-fpm-framework里已经介绍过了
循环监控以下三件事情
- 由信号触发的socket事件,信号事件来自外部的信号或者子进程的信号
- 监控慢请求以及子进程执行超时
- 当fpm的pm为PM_STYLE_DYNAMIC,是否需要kill掉不需要的子进程
其中分成两个部分,第一个部分是设置event事件
void fpm_event_loop(int err)
{
static struct fpm_event_s signal_fd_event;
//设置socket事件的回调函数,fpm_signals_get_fd() 就是 sp[0]
//socketpair 创建出sp[0] 以及 sp[1]这对socket
//当主进程收到信号事件后,在回调函数sig_handler里会向sp[1]里写一个标记
//从sp[0]里读出这个标记,由回调函数fpm_got_signal进行处理
//这样就把信号事件转换为socket事件了
fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);//fpm_got_signal是信号回调函数,在接下来信号那篇介绍
fpm_event_add(&signal_fd_event, 0);//在fpm_event_add里发现是FPM_EV_READ类型的话,加到fpm_event_queue_fd集合里
if (fpm_globals.heartbeat > 0) {
fpm_pctl_heartbeat(NULL, 0, NULL);//设置定时器对slow log以及子进程执行超时的监控
}
if (!err) {
fpm_pctl_perform_idle_server_maintenance_heartbeat(NULL, 0, NULL);//当fpm的pm为PM_STYLE_DYNAMIC,设置定时器监控是否需要kill掉不需要的子进程
#ifdef HAVE_SYSTEMD
fpm_systemd_heartbeat(NULL, 0, NULL);
#endif
}
//........................
}
第二部分是执行由信号触发的socket事件以及定时任务
void fpm_event_loop(int err)
{
//....................设置各种event事件
while (1) {
// search in the timeout queue for the next timer to trigger
//遍历fpm_event_queue_timer 这个定时器列表,找到最近的超时时间
//.................
//在wait里,对处理由信号触发的socket事件,主要是调用 fpm_got_signal 这个回调函数
ret = module->wait(fpm_event_queue_fd, timeout);
//..............
//遍历定时器链表,寻找需要处理的事件
q = fpm_event_queue_timer;
while (q) {
fpm_clock_get(&now);
if (q->ev) {
if (timercmp(&now, &q->ev->timeout, >) || timercmp(&now, &q->ev->timeout, ==)) {
fpm_event_fire(q->ev);//处理设置好的回调函数,比如 fpm_pctl_check_request_timeout
if (fpm_globals.parent_pid != getpid()) {
return;
}
if (q->ev->flags & FPM_EV_PERSIST) {
fpm_event_set_timeout(q->ev, now);//重置下次的超时时间
} else { // delete the event,非永久性的定时器,则删除
//....................
continue;
}
}
}
q = q->next;
}
}
}