Table of Contents
2 整体框架
fpm的代码入口在 sapi/fpm/fpm/fpm_main.c
int main(int argc, char * argv[])
{
sapi_startup(&cgi_sapi_module);
//获取参数,例如是否指定了php.ini之类的....
while ((c = php_getopt(argc, argv, OPTIONS, &php_optarg, &php_optind, 0, 2)) != -1) {
}
if (cgi_sapi_module.startup(&cgi_sapi_module) == FAILURE) {
return FPM_EXIT_SOFTWARE;
}
//初始化fpm
if (0 > fpm_init(argc, argv, fpm_config ? fpm_config : CGIG(fpm_config), fpm_prefix,
fpm_pid, test_conf, php_allow_to_run_as_root, force_daemon)) {
//...............
}
//fpm主进程向它的父进程发消息
if (fpm_globals.send_config_pipe[1]) {
int writeval = 1;
zlog(ZLOG_DEBUG, "Sending \"1\" (OK) to parent via fd=%d", fpm_globals.send_config_pipe[1]);
write(fpm_globals.send_config_pipe[1], &writeval, sizeof(writeval));
close(fpm_globals.send_config_pipe[1]);
}
//fpm主进程将在fpm_run中死循环接受子进程的信号,然后处理
fcgi_fd = fpm_run(&max_requests);
//下面的是子进程的处理逻辑
zend_first_try {
while (fcgi_accept_request(&request) >= 0) { //子进程accept client的请求,加锁竞争fd
php_execute_script(&file_handle TSRMLS_CC); //由zend引擎来执行php代码
fpm_log_write(NULL TSRMLS_CC);//打印fpm access log
php_request_shutdown((void * ) 0);//做了十几项收尾工作
//如果一个子进程达到运行的最大次数,则需要退出,由主进程重新fork进程接受请求
if (max_requests && (requests == max_requests)) {
break;
}
}
fcgi_shutdown();
} zend_catch {
exit_status = FPM_EXIT_SOFTWARE;
} zend_end_try();
return exit_status;
}
3 fpm主进程
fpm的主进程调用fpm_init进行初始化,然后在fpm_run里fork子进程后,不断 监听信号事件产生的socket事件 以及 执行定时任务
3.1 主进程初始化
int fpm_init(int argc, char** argv, char* config, char* prefix, char* pid, int test_conf, int run_as_root, int force_daemon)
{
fpm_globals.argc = argc;
fpm_globals.argv = argv;
if (config && * config) {
fpm_globals.config = strdup(config);
}
fpm_globals.prefix = prefix;
fpm_globals.pid = pid;
fpm_globals.run_as_root = run_as_root;
if (0 > fpm_php_init_main() ||
0 > fpm_stdio_init_main() ||
0 > fpm_conf_init_main(test_conf, force_daemon) ||
0 > fpm_unix_init_main() ||
0 > fpm_scoreboard_init_main() ||
0 > fpm_pctl_init_main() ||
0 > fpm_env_init_main() ||
0 > fpm_signals_init_main() ||
0 > fpm_children_init_main() ||
0 > fpm_sockets_init_main() ||
0 > fpm_worker_pool_init_main() ||
0 > fpm_event_init_main()) {
if (fpm_globals.test_successful) {
exit(FPM_EXIT_OK);
} else {
zlog(ZLOG_ERROR, "FPM initialization failed");
return -1;
}
}
if (0 > fpm_conf_write_pid()) {
zlog(ZLOG_ERROR, "FPM initialization failed");
return -1;
}
fpm_stdio_init_final();
zlog(ZLOG_NOTICE, "fpm is running, pid %d", (int) fpm_globals.parent_pid);
return 0;
}
主要有以下几个工作
- fpm_php_init_main : 主要是 注册了一个回调函数 ,用于清理工作
- fpm_stdio_init_main : 主要是 标准输入输出重定向到空设备 ,为 fpm的主进程成为守护进程做准备
- fpm_conf_init_main : 主要是加载fpm的配置文件, 解析文件,检查配置的合法性, event模型也是在这里面初始化的
- fpm_unix_init_main : 主要是让fpm的主进程成为守护进程
- fpm_scoreboard_init_main : 主要是为每个pool里的每个子进程建立状态榜,使用共享内存方式进行状态的获取
- fpm_pctl_init_main : 要是保存启动的参数,用于后续的进程控制,比如reload的时候需要这些参数
- fpm_env_init_main : 要是通过调整argv,来达到 修改fpm主进程的名字
- fpm_signals_init_main : 设置fpm 主进程信号相关的回调函数
- fpm_children_init_main : 主要是为了实现紧急重启功能而申请一块内存,记录子进程非正常(SIGSEGV or SIGBUS)退出的时间队列
- fpm_sockets_init_main : 为每个pool 建立监听的socket , 复用之前fpm主进程的fd ,这种情况发生在例如 reload 的情况下,即 SIGUSR2 信号
- fpm_worker_pool_init_main : 设置了一个回调函数,释放例如在fpm_scoreboard_init_main函数里申请的内存
- fpm_event_init_main : fpm主进程event事件监控初始化
- fpm_conf_write_pid : 写fpm主进程的pid
其中最重要的就是 fpm_signals_init_main , 把信号事件转换为socket事件来通知fpm主进程
3.2 创建子进程
fpm的主进程将变量每个pool,fork出需要的子进程数量 fork出来的子进程将返回main函数,加锁accept client的请求 而主进程将在fpm_event_loop里环处理由信号产生的socket事件以及定时任务
int fpm_run(int* max_requests)
{
struct fpm_worker_pool_s* wp;
// create initial children in all pools
for (wp = fpm_worker_all_pools; wp; wp = wp->next) {
int is_parent;
is_parent = fpm_children_create_initial(wp);//fork子进程
if (!is_parent) {
goto run_child;
}
// handle error
if (is_parent == 2) {
fpm_pctl(FPM_PCTL_STATE_TERMINATING, FPM_PCTL_ACTION_SET);
fpm_event_loop(1);
}
}
// run event loop forever
fpm_event_loop(0);//fpm主进程将在这个函数里循环处理由信号产生的socket事件以及定时任务
run_child: // only workers reach this point
fpm_cleanups_run(FPM_CLEANUP_CHILD);
* max_requests = fpm_globals.max_requests;
return fpm_globals.listening_socket;//子进程返回main函数,加锁accept client的请求
}
3.3 主进程主逻辑
循环监控以下三件事情
- 由信号触发的socket事件,信号事件来自外部的信号或者子进程的信号
- 监控慢请求以及子进程执行超时
- 当fpm的pm为PM_STYLE_DYNAMIC,是否需要kill掉不需要的子进程
void fpm_event_loop(int err)
{
static struct fpm_event_s signal_fd_event;
if (fpm_globals.parent_pid != getpid()) {
return;
}
//设置socket事件的回调函数,fpm_signals_get_fd() 就是 sp[0]
//socketpair 创建出sp[0] 以及 sp[1]这对socket
//当主进程收到信号事件后,在回调函数sig_handler里会向sp[1]里写一个标记
//从sp[0]里读出这个标记,由回调函数fpm_got_signal进行处理
//这样就把信号事件转换为socket事件了
fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);
fpm_event_add(&signal_fd_event, 0);
if (fpm_globals.heartbeat > 0) {
fpm_pctl_heartbeat(NULL, 0, NULL);//设置定时器对slow log以及子进程执行超时的监控
}
if (!err) {
fpm_pctl_perform_idle_server_maintenance_heartbeat(NULL, 0, NULL);//当fpm的pm为PM_STYLE_DYNAMIC,设置定时器监控是否需要kill掉不需要的子进程
zlog(ZLOG_DEBUG, "%zu bytes have been reserved in SHM", fpm_shm_get_size_allocated());
zlog(ZLOG_NOTICE, "ready to handle connections");
#ifdef HAVE_SYSTEMD
fpm_systemd_heartbeat(NULL, 0, NULL);
#endif
}
while (1) {
struct fpm_event_queue_s * q, * q2;
struct timeval ms;
struct timeval tmp;
struct timeval now;
unsigned long int timeout;
int ret;
if (fpm_globals.parent_pid != getpid()) {
return;
}
fpm_clock_get(&now);
timerclear(&ms);
//遍历fpm_event_queue_timer 这个定时器列表,找到最近的超时时间
// search in the timeout queue for the next timer to trigger
q = fpm_event_queue_timer;
while (q) {
if (!timerisset(&ms)) {
ms = q->ev->timeout;
} else {
if (timercmp(&q->ev->timeout, &ms, <)) {
ms = q->ev->timeout;
}
}
q = q->next;
}
//如果没有找到,或者最近的超时时间已经过了,则设置为1000ms
// 1s timeout if none has been set
if (!timerisset(&ms) || timercmp(&ms, &now, <) || timercmp(&ms, &now, ==)) {
timeout = 1000;
} else {
timersub(&ms, &now, &tmp);
timeout = (tmp.tv_sec * 1000) + (tmp.tv_usec / 1000) + 1;
}
//在wait里,对处理由信号触发的socket事件,主要是调用 fpm_got_signal 这个回调函数
ret = module->wait(fpm_event_queue_fd, timeout);
// is a child, nothing to do here
if (ret == -2) {
return;
}
if (ret > 0) {
zlog(ZLOG_DEBUG, "event module triggered %d events", ret);
}
//遍历定时器链表,寻找需要处理的事件
// trigger timers
q = fpm_event_queue_timer;
while (q) {
fpm_clock_get(&now);
if (q->ev) {
if (timercmp(&now, &q->ev->timeout, >) || timercmp(&now, &q->ev->timeout, ==)) {
fpm_event_fire(q->ev);//处理设置好的回调函数,比如 fpm_pctl_check_request_timeout
if (fpm_globals.parent_pid != getpid()) {
return;
}
if (q->ev->flags & FPM_EV_PERSIST) {
fpm_event_set_timeout(q->ev, now);//重置下次的超时时间
} else { // delete the event,非永久性的定时器,则删除
q2 = q;
if (q->prev) {
q->prev->next = q->next;
}
if (q->next) {
q->next->prev = q->prev;
}
if (q == fpm_event_queue_timer) {
fpm_event_queue_timer = q->next;
if (fpm_event_queue_timer) {
fpm_event_queue_timer->prev = NULL;
}
}
q = q->next;
free(q2);
continue;
}
}
}
q = q->next;
}
}
}
4 子进程
4.1 子进程的创建
在fpm_run里会调用fpm_children_create_initial进行fork子进程,接下来就看看子进程的创建过程
int fpm_children_create_initial(struct fpm_worker_pool_s * wp)
{
if (wp->config->pm == PM_STYLE_ONDEMAND) {
//.............
return 1;
}
return fpm_children_make(wp, 0 , 0, 1);
}
int fpm_children_make(struct fpm_worker_pool_s* wp, int in_event_loop, int nb_to_spawn, int is_debug)
{
pid_t pid;
struct fpm_child_s* child;
int max;
static int warned = 0;
if (wp->config->pm == PM_STYLE_DYNAMIC) {
if (!in_event_loop) { // starting
max = wp->config->pm_start_servers;
} else {
max = wp->running_children + nb_to_spawn;
}
} else if (wp->config->pm == PM_STYLE_ONDEMAND) {
if (!in_event_loop) { // starting
max = 0; // do not create any child at startup
} else {
max = wp->running_children + nb_to_spawn;
}
} else { // PM_STYLE_STATIC
max = wp->config->pm_max_children;
}
// fork children while:
// - fpm_pctl_can_spawn_children : FPM is running in a NORMAL state (aka not restart, stop or reload)
// - wp->running_children < max : there is less than the max process for the current pool
// - (fpm_global_config.process_max < 1 || fpm_globals.running_children < fpm_global_config.process_max):
// if fpm_global_config.process_max is set, FPM has not fork this number of processes (globaly)
while (fpm_pctl_can_spawn_children() && wp->running_children < max &&
(fpm_global_config.process_max < 1 || fpm_globals.running_children < fpm_global_config.process_max)) {
warned = 0;
child = fpm_resources_prepare(wp);
if (!child) {
return 2;
}
pid = fork();
switch (pid) {
case 0 :
fpm_child_resources_use(child);
fpm_globals.is_child = 1;
fpm_child_init(wp);//做一些初始化的工作
return 0;
case -1 :
zlog(ZLOG_SYSERROR, "fork() failed");
fpm_resources_discard(child);
return 2;
default :
child->pid = pid;
fpm_clock_get(&child->started);
fpm_parent_resources_use(child);
zlog(is_debug ? ZLOG_DEBUG : ZLOG_NOTICE, "[pool %s] child %d started", wp->config->name, (int) pid);
}
}
//这条warning日志打印的有问题,在pm为static同时process.max等于pm_max_children的时候是不应该打印的,可以参考另外一篇文章useless-warning-log-of-fpm
if (!warned && fpm_global_config.process_max > 0 && fpm_globals.running_children >= fpm_global_config.process_max) {
warned = 1;
zlog(ZLOG_WARNING, "The maximum number of processes has been reached. Please review your configuration and consider raising 'process.max'");
}
return 1;
}
4.2 子进程的初始化
static void fpm_child_init(struct fpm_worker_pool_s * wp)
{
fpm_globals.max_requests = wp->config->pm_max_requests;
if (0 > fpm_stdio_init_child(wp) ||
0 > fpm_log_init_child(wp) ||
0 > fpm_status_init_child(wp) ||
0 > fpm_unix_init_child(wp) ||
0 > fpm_signals_init_child() ||
0 > fpm_env_init_child(wp) ||
0 > fpm_php_init_child(wp)) {
zlog(ZLOG_ERROR, "[pool %s] child failed to initialize", wp->config->name);
exit(FPM_EXIT_SOFTWARE);
}
}
4.3 子进程主逻辑
在fpm_run里fork出的子进程,回到main主函数后,就开始accept client的请求进行处理
int main(int argc, char * argv[])
{
//.............
//fork子进程,对子进程进行初始化
fcgi_fd = fpm_run(&max_requests);
//下面的是子进程的处理逻辑
zend_first_try {
while (fcgi_accept_request(&request) >= 0) { //子进程accept client的请求,加锁竞争fd
php_execute_script(&file_handle TSRMLS_CC); //由zend引擎来执行php代码
fpm_log_write(NULL TSRMLS_CC);//打印fpm access log
php_request_shutdown((void * ) 0);//做了十几项收尾工作
//如果一个子进程达到运行的最大次数,则需要退出,由主进程重新fork进程接受请求
if (max_requests && (requests == max_requests)) {
break;
}
}
fcgi_shutdown();
} zend_catch {
exit_status = FPM_EXIT_SOFTWARE;
} zend_end_try();
return exit_status;
}
int fcgi_accept_request(fcgi_request * req)
{
int listen_socket = req->listen_socket;
sa_t sa;
socklen_t len = sizeof(sa);
fpm_request_accepting();
//加锁 accept client的请求
FCGI_LOCK(req->listen_socket);
req->fd = accept(listen_socket, (struct sockaddr * )&sa, &len);
FCGI_UNLOCK(req->listen_socket);
fcgi_is_allowed();//是否允许该client访问
//监听socket是否可读
fds.fd = req->fd;
fds.events = POLLIN;
fds.revents = 0;
do {
errno = 0;
ret = poll(&fds, 1, 5000);
} while (ret < 0 && errno == EINTR);
fcgi_read_request(req);//按fastcgi协议来解析请求
}