Nginx---父子进程通信

一. 参考《Nginx核心讲解》后加上参考源码,小结下Nginx中父子进程、子进程间如何通信。

实现原理网上都可以查出来,主要是通过socketpair()函数实现的,下面捋一下内部流程:
1. 话说要从ngx_start_worker_processes函数讲起,由于代码不多,贴出来:

static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
    ngx_int_t      i;
    ngx_channel_t  ch;

    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");

    ch.command = NGX_CMD_OPEN_CHANNEL;
	/*n是由配置文件中获取,子进程个数*/
    for (i = 0; i < n; i++) {
		/*子进程创建函数,下面具体讲*/
        ngx_spawn_process(cycle, ngx_worker_process_cycle,
                          (void *) (intptr_t) i, "worker process", type);

        ch.pid = ngx_processes[ngx_process_slot].pid;
        ch.slot = ngx_process_slot;
        ch.fd = ngx_processes[ngx_process_slot].channel[0];
		
		/*父进程中执行该函数,主要像前面各个子进程的channel[0]发送消息*/
        ngx_pass_open_channel(cycle, &ch);
    }
}

ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
    char *name, ngx_int_t respawn)
{
	//...
	//...
	/*创建一对套接字讲用于父子进程间通信*/
	if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
	{
		ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
					  "socketpair() failed while spawning \"%s\"", name);
		return NGX_INVALID_PID;
	}
	
	//...
	/*各种套接字属性设置*/
	ngx_channel = ngx_processes[s].channel[1];//ngx_channel干啥用的?留个悬念
	//...
	
	/*重点来了,创建子进程*/
	pid = fork();
    switch (pid) {
    case -1:
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "fork() failed while spawning \"%s\"", name);
        ngx_close_channel(ngx_processes[s].channel, cycle->log);
        return NGX_INVALID_PID;

    case 0:
        ngx_pid = ngx_getpid();
		/*调用ngx_worker_process_cycle函数,子进程死循环处理流程,下面再讲*/
        proc(cycle, data);
        break;

    default:
        break;
    }

	//...
	/*对子进程的相关信息进行保存*/
	ngx_processes[s].pid
	//...
	/*数组下标加1*/
	if (s == ngx_last_process) {
        ngx_last_process++;
    }
    return pid;
}

ngx_spawn_process()函数是在一个for循环中调用,假如有4个子进程,也就是说会进行四次
socketpair()的创建执行,每一次的创建,ngx_processes[s].channel都会有两个fd生成,
假设进行第一次循环时,ngx_processes[0].channel里面已经有ngx_processes[0].channel[0]和
ngx_processes[0].channel[1],继续往下执行,执行子进程创建执行proc(cycle, data);时,调用
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
	//...
	/*子进程初始化相关工作*/
	ngx_worker_process_init(cycle, worker);
	//...
	for( ;; )
	{
		/*子进程死循环,虽然里面内容很重要,但与本节无关*/
	}
}
static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
	//...
	//...
	/*for循环,进行了下异常处理*/
    for (n = 0; n < ngx_last_process; n++) {
	}
	
	/*在子进程中关闭掉channel[0],只需要用channel[1]*/
	if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
	ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
				  "close() channel failed");
	}
	/*这个比较重要,但还没理解透*/
    if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
                              ngx_channel_handler)
        == NGX_ERROR)
    {
        /* fatal */
        exit(2);
    }
}
ngx_int_t
ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event,
    ngx_event_handler_pt handler)
{
	//...
	//...
    ev = (event == NGX_READ_EVENT) ? rev : wev;
	/*将ngx_channel_handler函数挂在ev->handler上*/
    ev->handler = handler;

	/*如果是epoll,则执行ngx_epoll_add_connection函数,在里面可以看到
	    if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
                      "epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
        return NGX_ERROR;
    }
	将描述符第二个参数fd,而fd就是ngx_channel加入到了事件中
	ngx_channel在哪里?
	ngx_channel来自于ngx_spawn_process()中
	ngx_channel = ngx_processes[s].channel[1];
*/
    if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
        if (ngx_add_conn(c) == NGX_ERROR) {
            ngx_free_connection(c);
            return NGX_ERROR;
        }

    } else {
		/*添加事件,一旦有可读事件到来时,执行ev->handler*/
        if (ngx_add_event(ev, event, 0) == NGX_ERROR) {
            ngx_free_connection(c);
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}

/*当有可读事件来的时候,触发该函数*/
static void
ngx_channel_handler(ngx_event_t *ev)
{
	//...
	ngx_connection_t  *c;
	c = ev->data; //下面的c->fd来自哪里?
	/*ev->data,*/
	//...
	for ( ;; ) {
		
		/*里面就是调用recvmsg(s, &msg, 0);读取消息*/
        n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);
		switch()
		{
			//...
			case NGX_CMD_OPEN_CHANNEL:
			//...
			/*保存各个槽位的子进程信息*/
			//...
		}
	}
	//...
}
从上面可以小结一下,感觉一下,子进程拥有套接字ngx_processes[s].channel[1],并加入
了可读事件中,一直等待着读,即等待着调用recvmsg(),那么由谁来sendmsg呢?通过哪个
套接字呢?
继续:
视线回到ngx_spawn_process()函数中,该函数带领我们一步步走进子进程的处理过程,回到
该函数调用的地方即函数ngx_start_worker_processes()中,它下面接着执行ch.pid、ch.slot
、ch.fd的赋值,用处在下面:接着调用
/*注意调用该函数是在父进程中执行*/
static void
ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch)
{
	/*for循环,细细体味下,假如多个子进程时,通过该循环向不同的channel[0]发送msg*/
    for (i = 0; i < ngx_last_process; i++) {
	/*一些异常处理*/
	//...
	/*该函数实际上就是调用sendmsg(s, &msg, 0);进行消息的发送*/
	/*注意参数:第一个参数ngx_processes[i].channel[0]就是要发送的fd,其实也在ch里面包含着*/
	ngx_write_channel(ngx_processes[i].channel[0],
					  ch, sizeof(ngx_channel_t), cycle->log);
	
	}
}
综上: 创建一个子进程时,父进程就会向各个channel[0]中sendmsg,子进程从channel[1]recvmsg。

相关文章
相关标签/搜索