好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

message queue的设计

message queue的设计

为了在各线程之间高效的传递消息,必须设计一种高效率的消息队列,传统的做法是mutex加queue,这种做法在每次执行push和pop时都要加锁,

效率相对较低。其次还有使用循环队列,可以做到完全无锁,但只能实现1:1的消息传递。还有一些lock-free队列的实现,但基于其实现的相对复杂

性,我不打算使用。

我的队列设计是使用tls维护一个local list,每个线程执行push时,首先将元素放入属于本线程的local list中,此时是无需加锁的,然后检查队列中元素

的总数,如果发现总数超过一个阀值,则将local list中的所有元素一次性提交到share list中,此时需要加锁,share list中的元素是对全局可见的。

当读者执行pop操作时,首先从检查自己的local list中是否有元素,如果有就返回一个,如果没有则尝试从share list中将所有元素同步到自己的local list

中.

local list和message queue的结构如下:

 struct   per_thread_struct
{
    list_node   next;
      struct   double_link_node block;
      struct  link_list * local_q;
    condition_t cond;
};

  struct   mq
{
    uint32_t           push_size;
    pthread_key_t      t_key;
    mutex_t            mtx;
      struct   double_link blocks;
      struct  link_list  * share_list;
      struct  link_list  * local_lists;

}; 

对于push操作,提供了两个接口:

 void  mq_push(mq_t, struct  list_node* );
  void  mq_push_now(mq_t, struct  list_node*);

mq_push将元素插入local list但只有当local list中的元素到达一定阀值时才会执行提交操作mq_sync_push.

而mq_push_now将元素插入local list之后马上就会执行提交操作.

然后还有一个问题,如果local list中的元素较长时间内都达不到阀值,会导致消息传递的延时,所以提供了mq_force_sync函数,此函数的作用是

强制将执行一次提交操作,将local list中的所有元素提交到share list中去。producer线程可在其主循环内以固定的频率执行mq_force_sync,将一个

时间循环内剩余未被提交的消息提交出去.

下面贴下测试代码:

#include <stdio.h> 
#include  <stdlib.h> 
#include   "  KendyNet.h  "  
#include   "  thread.h  "  
#include   "  SocketWrapper.h  "  
#include   "  atomic.h  "  
#include   "  SysTime.h  "  
#include   "  mq.h  "  

list_node  *node_list1[ 5  ];
list_node  *node_list2[ 5  ];
mq_t mq1;

  void  *Routine1( void  * arg)
{
      int  j =  0  ;
      for  ( ; ; )
    {
          int  i =  0  ;
          for (; i <  10000000 ; ++ i)
        {
            mq_push(mq1, & node_list1[j][i]);
        }
        mq_force_sync(mq1);
        j  = (j +  1 )% 5  ; 
        sleepms(  100  );

    }
}

  void  *Routine3( void  * arg)
{
      int  j =  0  ;
      for  ( ; ; )
    {
          int  i =  0  ;
          for (; i <  10000000 ; ++ i)
        {
            mq_push(mq1, & node_list2[j][i]);
        }
        mq_force_sync(mq1);
        j  = (j +  1 )% 5  ; 
        sleepms(  100  );

    }
}

  void  *Routine2( void  * arg)
{
    uint64_t count  =  0  ;
    uint32_t tick  =  GetCurrentMs();
      for  ( ; ; )
    {
        list_node  *n = mq_pop(mq1, 50  );
          if  (n)
        {
             ++ count;
        }
        uint32_t now  =  GetCurrentMs();
          if (now - tick >  1000  )
        {
            printf(  "  recv:%d\n  " ,(count* 1000 )/(now- tick));
            tick  =  now;
            count  =  0  ;
        }
    }
}


  int   main()
{
      int  i =  0  ;
      for ( ; i <  5 ; ++ i)
    {
        node_list1[i]  = calloc( 10000000 , sizeof  (list_node));
        node_list2[i]  = calloc( 10000000 , sizeof  (list_node));
    }
    mq1  = create_mq( 4096  );
    init_system_time(  10  );
    thread_t t1  = create_thread( 0  );
    start_run(t1,Routine1,NULL);

    thread_t t3  = create_thread( 0  );
    start_run(t3,Routine3,NULL);    

    thread_t t2  = create_thread( 0  );
    start_run(t2,Routine2,NULL);

    getchar();

      return   0  ;
} 

因为主要是测试mq的效率,所以预先生成了1亿个消息,分为两个写者一个读者,两个写者循环不断的发消息,每发送1000W休眠一小会.

读者仅仅是从mq中pop一个消息出来,然后更新统计值.在我的i3 2.93双核台式机上运行rhel 6虚拟机,每秒pop出来的消息数量大概在8000W上下。

这个数据足已满足任何高性能的应用需求了.

 https://github.com/sniperHW/kendylib/blob/master/src/mq.c 

 

分类:  网络程序 ,  游戏服务器

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于message queue的设计的详细内容...

  阅读:58次