好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

浅谈几种服务器端模型——反应堆模式(基于epoll的反应堆)

浅谈几种服务器端模型——反应堆模式(基于epoll的反应堆)

引言:前面一章简单介绍了关于epoll 的使用方式,这一章介绍一下一个简单的反应堆模型,没有实现超时机制的管理。最主要的是要介绍一下关于异步事件反应堆的设计方式。

反应堆的模型图在上一张可以看到,但是那个是盗来的一张图,twisted 的反应堆。今天给不熟悉这个部分的朋友介绍一下基于 epoll 的反应堆,过程类似于libevent.

反应堆可以提供几个操作:

(0)创建一个反应堆:

?

mc_event_base_t * mc_base_new( void ) ;

返回一个操作句柄.  

(1)为某一个需要监听的文件描述符加入回调函数,并注册事件类型。

?

int mc_event_set( mc_event_t *ev , short revent , int fd , mc_ev_callback callback , void *args )  ;

     /*

      * Initialize a event , add callback and event type

      * if the event exists , this function will change the mode of this event

      * and fd

      */

 这里的 revent 由宏定义为几种类型:

  

?

#define MC_EV_READ     0x0001

#define MC_EV_WRITE    0x0002

#define MC_EV_SIGNAL   0x0004

#define MC_EV_TIMEOUT  0x0008

#define MC_EV_LISTEN   0x0010

相应的操作可以使用 | 运算来并几个需要监听的事件类型。

事件类型定义如下:

?

typedef struct mc_event_s

{

      

       struct mc_event_s   *next    ;

       

       

       struct mc_event_s   *prev    ;

      

      unsigned int min_heap_index  ;

       

      int ev_fd      ;   // file des of event

      short revent   ;   // event type

       

      struct timeval  ev_timeval   ; // event timeout time

      mc_ev_callback callback ; // callback of this event

      void   *args                  ;

      int ev_flags                 ;

       

      mc_event_base_t    *base     ;

}mc_event_t ;

事件结构本身后面解释。 

(2)将需要监听的并且已经初始化的事件加入反应堆。

?

int mc_event_post( mc_event_t *ev , mc_event_base_t * base ) ;

     /*

      * Post this event to event_base

      * struct base has two queue , active queue and added queue

      * this function will post event to added queue , but not in active queue

      */

将刚才注册了事件类型和回调函数的事件加入 base, 即将其看做一个反应堆。

(3)最后提供了一个 dispatch 函数,反应堆开始循环,等待事件的发生。如果对应的 fd 上的事件发生,调用相应的回调函数。由第一步注册。

?

int mc_dispatch( mc_event_base_t * base ) ;

     /*

      * start loop

      * and dispatch event by

      * mc_event_loop

      */

反应堆支持在循环过程中,通过相应的回调函数再注册事件,类似于热加入,热移除。

实现方式很简单,就是在第一个事件的回调函数上调用 mc_event_set()然后注册。再加入 base.

base 的结构如下 :

?

typedef struct mc_event_base_s

{

     void          *  added_list      ;

     void          *  active_list     ;

     unsigned int     event_num       ;

     unsigned int     event_active_num;

      

     /*

      *mc_minheap        minheap         ;

      */

     int              epoll_fd        ;  //for epoll only

     int              ev_base_stop    ;

     int              magic           ;

     struct timeval  event_time      ;  

}mc_event_base_t ;

让我们来看一个简单的 demo

?

/*_____________________test bellow ______________________*/

#define mc_sock_fd  int

 

 

#define DEFAULT_NET AF_INET

#define DEFAULT_DATA_GRAM   SOCK_STREAM

#define DEFAULT_PORT        (1115)

#define DEFAULT_BACKLOG     (200)

 

/* simple connection */

struct _connection

{

     int fd            ;

     mc_event_t  read  ;

     mc_event_t  write ;

     char buf[1024]    ;

     mc_event_base_t * base ;

};

void setreuseaddr( mc_sock_fd fd )

{

     int yes = 1 ;

     setsockopt( fd , SOL_SOCKET , SO_REUSEADDR , &yes , sizeof ( int ) );

}

int mc_socket()

{

     int retsock = socket(DEFAULT_NET,DEFAULT_DATA_GRAM,0) ;

     if ( retsock < 0  )

     {

         /* we should add some debug information here

         fprintf(LOGPATH,"socket error\n");

         */

         return -1 ;

     }

     return retsock ;

}      

 

int mc_bind(mc_sock_fd listenfd )

{

     struct sockaddr_in serveraddr ;

     bzero(&serveraddr, sizeof (serveraddr));

 

     serveraddr.sin_family = AF_INET ;

     serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

     serveraddr.sin_port = htons(DEFAULT_PORT);

     return bind(listenfd,( struct sockaddr *)&serveraddr , sizeof (serveraddr ));

}

 

int mc_isten(mc_sock_fd listenfd)

{

     return listen(listenfd,DEFAULT_BACKLOG);

}

 

 

 

void handler_accept( int fd , short revent , void *args )

{

     struct sockaddr_in in_addr ;

     size_t in_len ;

     int s   ;

     int done = 0 ;

     struct _connection * lc = ( struct _connection *)args ;

      

     in_len = sizeof ( in_addr );

     mc_setnonblocking(fd) ;

     while ( !done )

     {

         s = accept( fd , ( struct sockaddr *)&in_addr , &in_len );

         if ( s == -1 )

         {

             if ( ( errno == EAGAIN )|| ( errno == EWOULDBLOCK ) )

             {

                 break ;

             }

             else

             {

                 perror ( "accept" );

                 break ;

             }

         }

         if ( s == 0 )

         {

             fprintf (stderr, "Accept a connection on %d \n" ,fd );

         }

         done = 1 ;

     }

         mc_setnonblocking(s) ;

         lc->fd = s ;

         mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );

          

          

         mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );

         mc_event_post( &(lc->write) , lc->base );

           

          

}

void handler_read( int fd , short revent , void *args )

{

     mc_setnonblocking(fd) ;

     struct _connection * lc ;

     lc  = ( struct _connection *)args ;

     read( fd , lc->buf , 1024 );

     mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );

}

 

void handler_write( int fd , short revent , void *args )

{

     mc_setnonblocking(fd) ;

     struct _connection * lc ;

     lc  = ( struct _connection *)args ;

     write( fd , lc->buf , 1024 );

     mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );

}

 

void cab( int fd , short revent , void *args )

{

     mc_setnonblocking(fd) ;

     char buf[1024] = "xx00xx00xx00xx00\n" ;

     write(fd,buf,1024);

}

int main()

{

     mc_event_t mev ;

     mc_event_base_t  *base = mc_base_new() ;

     struct _connection lc ;

     lc.base = base ;

      

     int sockfd = mc_socket() ;

     mc_bind(sockfd);

     mc_isten(sockfd);

      

     mc_event_set( &(lc.read) , MC_EV_READ , sockfd , handler_accept , &lc );

     mc_event_post( &(lc.read) , base );

     mc_dispatch(base);

     return 0;

}

  

首先:封装的几个套接口操作没有考虑错误处理,作为简单的实例。

定义了一个 connection 结构,用于表示每一个到来的连接,这里的 struct _connection 中包含读写事件和一个缓冲区,还有指向反应堆的指针和对应注册的fd

工作过程如下:(集中看  main函数)

(1)创建一个反应堆。

(2)实例化一个 connection

(3)创建套接口,bind,listen 老生常谈,这里就不多说了

(4)将这个监听套接口注册相应的回调函数,这里我们注册的是 handler_accept() 函数,回调函数类型都是  void *XXX(  int  , short , void *) ;

       当监听套接口发生可读事件时,第一次我们认为是相应的监听套接口得到了新的连接,所以,第一次调用的时候直接调用注册了的回调函数 handler_accept().

在handler_accept() 函数中,我们为这个连接的读写事件添加了相应的回调函数,并把连接描述符(不是监听描述符)注册到这个上。下次这个套接口可读的时候调用handler_read(),可写的时候调用handler_write(). 如果需要改变状态或改变回调函数,只需要一个状态机或者别的方式来确定需要的回调函数是哪一个,在我们的handler_write() 和 handler_read()中可以改变回调函数,代码所示。

PS:注意一点的是我们的事件是一个实例,不管是在connection结构中或是自己定义,都需要不断的向操作系统申请空间,如果采用对象池或者connection池的方式,可以减少服务器的负载。

总结:反应堆模式最基本的操作就是:注册事件(为需要监听的fd加入回调函数)----->将事件加入反应堆------>开始事件循环------>事件发生,调用回调函数。

异步操作的精髓就是在这里,而不是同步的等待每一个事件。下一章讲解这个反应堆的实现,越来越带感咯.

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于浅谈几种服务器端模型——反应堆模式(基于epoll的反应堆)的详细内容...

  阅读:38次