message queue的设计
为了在各线程之间高效的传递消息,必须设计一种高效率的消息队列,传统的做法是mutex加queue,这种做法在每次执行push和pop时都要加锁,
效率相对较低。其次还有使用循环队列,可以做到完全无锁,但只能实现1:1的消息传递。还有一些lock-free队列的实现,但基于其实现的相对复杂
性,我不打算使用。
我的队列设计是使用tls维护一个local list,每个线程执行push时,首先将元素放入属于本线程的local list中,此时是无需加锁的,然后检查队列中元素
的总数,如果发现总数超过一个阀值,则将local list中的所有元素一次性提交到share list中,此时需要加锁,share list中的元素是对全局可见的。
当读者执行pop操作时,首先从检查自己的local list中是否有元素,如果有就返回一个,如果没有则尝试从share list中将所有元素同步到自己的local list
中.
local list和message queue的结构如下:
struct per_thread_struct { list_node next; struct double_link_node block; struct link_list * local_q; condition_t cond; }; struct mq { uint32_t push_size; pthread_key_t t_key; mutex_t mtx; struct double_link blocks; struct link_list * share_list; struct link_list * local_lists; };
对于push操作,提供了两个接口:
void mq_push(mq_t, struct list_node* ); void mq_push_now(mq_t, struct list_node*);
mq_push将元素插入local list但只有当local list中的元素到达一定阀值时才会执行提交操作mq_sync_push.
而mq_push_now将元素插入local list之后马上就会执行提交操作.
然后还有一个问题,如果local list中的元素较长时间内都达不到阀值,会导致消息传递的延时,所以提供了mq_force_sync函数,此函数的作用是
强制将执行一次提交操作,将local list中的所有元素提交到share list中去。producer线程可在其主循环内以固定的频率执行mq_force_sync,将一个
时间循环内剩余未被提交的消息提交出去.
下面贴下测试代码:
#include <stdio.h> #include <stdlib.h> #include " KendyNet.h " #include " thread.h " #include " SocketWrapper.h " #include " atomic.h " #include " SysTime.h " #include " mq.h " list_node *node_list1[ 5 ]; list_node *node_list2[ 5 ]; mq_t mq1; void *Routine1( void * arg) { int j = 0 ; for ( ; ; ) { int i = 0 ; for (; i < 10000000 ; ++ i) { mq_push(mq1, & node_list1[j][i]); } mq_force_sync(mq1); j = (j + 1 )% 5 ; sleepms( 100 ); } } void *Routine3( void * arg) { int j = 0 ; for ( ; ; ) { int i = 0 ; for (; i < 10000000 ; ++ i) { mq_push(mq1, & node_list2[j][i]); } mq_force_sync(mq1); j = (j + 1 )% 5 ; sleepms( 100 ); } } void *Routine2( void * arg) { uint64_t count = 0 ; uint32_t tick = GetCurrentMs(); for ( ; ; ) { list_node *n = mq_pop(mq1, 50 ); if (n) { ++ count; } uint32_t now = GetCurrentMs(); if (now - tick > 1000 ) { printf( " recv:%d\n " ,(count* 1000 )/(now- tick)); tick = now; count = 0 ; } } } int main() { int i = 0 ; for ( ; i < 5 ; ++ i) { node_list1[i] = calloc( 10000000 , sizeof (list_node)); node_list2[i] = calloc( 10000000 , sizeof (list_node)); } mq1 = create_mq( 4096 ); init_system_time( 10 ); thread_t t1 = create_thread( 0 ); start_run(t1,Routine1,NULL); thread_t t3 = create_thread( 0 ); start_run(t3,Routine3,NULL); thread_t t2 = create_thread( 0 ); start_run(t2,Routine2,NULL); getchar(); return 0 ; }
因为主要是测试mq的效率,所以预先生成了1亿个消息,分为两个写者一个读者,两个写者循环不断的发消息,每发送1000W休眠一小会.
读者仅仅是从mq中pop一个消息出来,然后更新统计值.在我的i3 2.93双核台式机上运行rhel 6虚拟机,每秒pop出来的消息数量大概在8000W上下。
这个数据足已满足任何高性能的应用需求了.
https://github.com/sniperHW/kendylib/blob/master/src/mq.c
分类: 网络程序 , 游戏服务器
作者: Leo_wl
出处: http://www.cnblogs.com/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息查看更多关于message queue的设计的详细内容...