单进程单线程使用select同时监听多个socket文件描述符
大致的处理过程就是将监听
和其他客户已连接socket
都放入到select的set
中,然后阻塞等待文件描述符就绪;
一旦返回,便返回就绪的个数,就绪后就要依次判断是否是监听文件描述符
就绪,是的话就从中取出新建立的socket,设置进set中以便监听,同时设置进本地保存的客户数组;
如果是连接的socket
,则依次遍历客户数组,找出在set中返回的那个,对其进行读写操作;
只有对当前所有的文件描述符都处理完了之后才会再一次调用select监听,而且是一个接一个的进行处理,处理的过程中当前就绪的文件描述符依次等待,新进的请求也要等待;如果是十分耗时的处
理,那么就要等待很久;
迭代服务器
没有并发控制,没有多路复用;来一个处理一个;最原始的做法
并发服务器-进程
for ( ; ; ) {
clilen = addrlen;
if ( (connfd = accept(listenfd, cliaddr, &clilen)) < 0) {
if (errno == EINTR)
continue; /* back to for() */
else
err_sys("accept error");
}
if ( (childpid = Fork()) == 0) { /* child process */
Close(listenfd); /* close listening socket */
web_child(connfd); /* process request */
exit(0);
}
Close(connfd); /* parent closes connected socket */
}
accept阻塞等待新连接,来一个fork一个进程去处理;
预先分配进程池,由子进程accept()阻塞获取新连接
预先分配进程池,有新的连接到时分配进程去处理,当进程池不够时新连接会被忽略,直到有一个子进程可用;(然而在内核中,这些新连接并没有被忽略,而是会在llisten已完成队列中等待被accept(),直到backlog数为止); 先看父进程
...
for (i = 0; i < nchildren; i++)//创建子进程,返回pid到数组中
pids[i] = child_make(i, listenfd, addrlen); /* parent returns */
Signal(SIGINT, sig_int); //设置信号
...
pid_t
child_make(int i, int listenfd, int addrlen)
{
pid_t pid;
void child_main(int, int, int);
if ( (pid = Fork()) > 0)
return(pid); /* parent */
child_main(i, listenfd, addrlen); /* never returns */
}
child_main(int i, int listenfd, int addrlen)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr;
cliaddr = Malloc(addrlen);
printf("child %ld starting\n", (long) getpid());
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);//每个子进程都在等待可用的连接
//一旦返回便进行处理
web_child(connfd); /* process the request */
Close(connfd);
}
}
- 以上具体分配给哪个进程去处理,主要看哪个进程最先抢到accept()的返回;在有一个新连接到达时,所有进程都会被唤醒,称为“惊醒”,然后最先运行的进程获得,其他进程继续睡眠;
- 这个惊醒在进程很多的时候,会导致性能受损,有时会诱发严重的问题;
- 同时多个进程accept()一个监听套接字,在一些操作系统上可能诱发错误;
- 所以这里也可用使用select()来先监控lisnten是否就绪,然后再调用accept()来获取新连接,但是除了两次系统调用会使性能下降外,select()也会发送冲突,因为socket套接字中只保存一个进程ID来进行唤醒阻塞进程,所以在多进程同时监控同一个套接字的情况下,select()调用就必须唤醒所有进程,这又是额外的开销;
预先分配进程池,accept()上锁
解决上述存在的问题,可以在每个进程要accept()前上锁,这样就只有一个进程阻塞在accept()调用上,从而规避了“惊醒”和可能的错误;不过会比原来的更耗时;
预先分配进程池,传递描述符
父进程调用accept()来获取新连接,然后将描述符传递给空闲进程处理;这里父子进程是通过标准错误输出来进行描述符传递的;而且父进程需要使用select()来同时监控监听套接字和标准错误(子进程描述符)输出是否就绪;
for (i = 0; i < nchildren; i++) {//初始化子进程,设置子进程描述符进set中
child_make(i, listenfd, addrlen); /* parent returns */
FD_SET(cptr[i].child_pipefd, &masterset);
maxfd = max(maxfd, cptr[i].child_pipefd);
}
Signal(SIGINT, sig_int);
for ( ; ; ) {
rset = masterset;
if (navail <= 0)
FD_CLR(listenfd, &rset); /* turn off if no available children */
nsel = Select(maxfd + 1, &rset, NULL, NULL, NULL);//监控
/* 4check for new connections */
if (FD_ISSET(listenfd, &rset)) { //新链接
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);
for (i = 0; i < nchildren; i++)//寻找空闲进程
if (cptr[i].child_status == 0)
break; /* available */
if (i == nchildren)
err_quit("no available children");
cptr[i].child_status = 1; /* mark child as busy */
cptr[i].child_count++;
navail--;
n = Write_fd(cptr[i].child_pipefd, "", 1, connfd);//传递描述符
Close(connfd);
if (--nsel == 0)
continue; /* all done with select() results */
}
/* 4find any newly-available children */
for (i = 0; i < nchildren; i++) {
if (FD_ISSET(cptr[i].child_pipefd, &rset)) {//如果收到子进程的通知,说明子进程处理完了
if ( (n = Read(cptr[i].child_pipefd, &rc, 1)) == 0)
err_quit("child %d terminated unexpectedly", i);
cptr[i].child_status = 0;
navail++;
if (--nsel == 0)
break; /* all done with select() results */
}
}
}
child_main(int i, int listenfd, int addrlen)
{
char c;
int connfd;
ssize_t n;
void web_child(int);
printf("child %ld starting\n", (long) getpid());
for ( ; ; ) {
if ( (n = Read_fd(STDERR_FILENO, &c, 1, &connfd)) == 0)//子进程等待文件描述符进来
err_quit("read_fd returned 0");
if (connfd < 0)
err_quit("no descriptor from read_fd");
web_child(connfd); /* process request */
Close(connfd);
Write(STDERR_FILENO, "", 1); /* tell parent we're ready again */
}
}
并发服务器-线程
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_create(&tid, NULL, &doit, (void *) connfd);
}
基本是来一个新连接,就创建一个新线程去处理;
doit(void *arg)
{
void web_child(int);
Pthread_detach(pthread_self());
web_child((int) arg);
Close((int) arg);
return(NULL);
}
不过每次都新建线程也是挺耗时的;
预先分配线程池,每个线程各自accept()
预先创建好多个线程,然后每个线程自己accpet()来获取新连接并处理,由于在线程中,所以对 accept()上锁会比在进程方便很多;然后让线程去争夺锁,进而等待新链接到来;
for (i = 0; i < nthreads; i++)
thread_make(i); /* only main thread returns */
Signal(SIGINT, sig_int);
for ( ; ; )
pause(); /* everything done by threads */
for ( ; ; ) {
clilen = addrlen;
Pthread_mutex_lock(&mlock);
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_mutex_unlock(&mlock);
tptr[(int) arg].thread_count++;
web_child(connfd); /* process request */
Close(connfd);
}
这是所有服务器设计范式中最快的!
预先分配线程池,主线程accept()
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_mutex_lock(&clifd_mutex);
clifd[iput] = connfd;
if (++iput == MAXNCLI)
iput = 0;
if (iput == iget)
err_quit("iput = iget = %d", iput);
Pthread_cond_signal(&clifd_cond);
Pthread_mutex_unlock(&clifd_mutex);
直接看代码,这里主线程获取新连接,然后放入到clifd数组中,通过iput和iget两个指针指向来代表当前可用连接位置;然后使用互斥锁+条件变量来唤醒线程;让被唤醒的线程再从clifd数组中 clifd[iget]获取新链接进行处理;
for ( ; ; ) {
Pthread_mutex_lock(&clifd_mutex);
while (iget == iput) //若相等,则表示没有新连接
Pthread_cond_wait(&clifd_cond, &clifd_mutex);
connfd = clifd[iget]; /* connected socket to service */
if (++iget == MAXNCLI)
iget = 0;
Pthread_mutex_unlock(&clifd_mutex);
tptr[(int) arg].thread_count++;
web_child(connfd); /* process request */
Close(connfd);
}
该模式比上一个慢,除了都是用到的互斥锁外,还使用了条件变量;
总结: 1、使用池可以显著提高速度;不过真正应用场景需要动态的监控池的数量,过多过少都要进行处理 2、让所有进程或线程自行监控accept()要比主进程监控并传递描述符要简单快速; 3、由于select()存在冲突问题,所以通常调用accept()就足够了;所以一般在允许多线程或者多进程的情况下,新连接的到来就直接开辟新线程(现在很少开进程吧)去处理,而不需要使用到select(),除非是在单线程的情况,或者有多种可能的文件描述符;