message queue的设计
为了在各线程之间高效的传递消息,必须设计一种高效率的消息队列,传统的做法是mutex加queue,这种做法在每次执行push和pop时都要加锁,
效率相对较低。其次还有使用循环队列,可以做到完全无锁,但只能实现1:1的消息传递。还有一些lock-free队列的实现,但基于其实现的相对复杂
性,我不打算使用。
我的队列设计是使用tls维护一个local list,每个线程执行push时,首先将元素放入属于本线程的local list中,此时是无需加锁的,然后检查队列中元素
的总数,如果发现总数超过一个阀值,则将local list中的所有元素一次性提交到share list中,此时需要加锁,share list中的元素是对全局可见的。
当读者执行pop操作时,首先从检查自己的local list中是否有元素,如果有就返回一个,如果没有则尝试从share list中将所有元素同步到自己的local list
中.
local list和message queue的结构如下:
struct per_thread_struct
{
list_node next;
struct double_link_node block;
struct link_list * local_q;
condition_t cond;
};
struct mq
{
uint32_t push_size;
pthread_key_t t_key;
mutex_t mtx;
struct double_link blocks;
struct link_list * share_list;
struct link_list * local_lists;
};
对于push操作,提供了两个接口:
void mq_push(mq_t, struct list_node* ); void mq_push_now(mq_t, struct list_node*);
mq_push将元素插入local list但只有当local list中的元素到达一定阀值时才会执行提交操作mq_sync_push.
而mq_push_now将元素插入local list之后马上就会执行提交操作.
然后还有一个问题,如果local list中的元素较长时间内都达不到阀值,会导致消息传递的延时,所以提供了mq_force_sync函数,此函数的作用是
强制将执行一次提交操作,将local list中的所有元素提交到share list中去。producer线程可在其主循环内以固定的频率执行mq_force_sync,将一个
时间循环内剩余未被提交的消息提交出去.
下面贴下测试代码:
#include <stdio.h>
#include <stdlib.h>
#include " KendyNet.h "
#include " thread.h "
#include " SocketWrapper.h "
#include " atomic.h "
#include " SysTime.h "
#include " mq.h "
list_node *node_list1[ 5 ];
list_node *node_list2[ 5 ];
mq_t mq1;
void *Routine1( void * arg)
{
int j = 0 ;
for ( ; ; )
{
int i = 0 ;
for (; i < 10000000 ; ++ i)
{
mq_push(mq1, & node_list1[j][i]);
}
mq_force_sync(mq1);
j = (j + 1 )% 5 ;
sleepms( 100 );
}
}
void *Routine3( void * arg)
{
int j = 0 ;
for ( ; ; )
{
int i = 0 ;
for (; i < 10000000 ; ++ i)
{
mq_push(mq1, & node_list2[j][i]);
}
mq_force_sync(mq1);
j = (j + 1 )% 5 ;
sleepms( 100 );
}
}
void *Routine2( void * arg)
{
uint64_t count = 0 ;
uint32_t tick = GetCurrentMs();
for ( ; ; )
{
list_node *n = mq_pop(mq1, 50 );
if (n)
{
++ count;
}
uint32_t now = GetCurrentMs();
if (now - tick > 1000 )
{
printf( " recv:%d\n " ,(count* 1000 )/(now- tick));
tick = now;
count = 0 ;
}
}
}
int main()
{
int i = 0 ;
for ( ; i < 5 ; ++ i)
{
node_list1[i] = calloc( 10000000 , sizeof (list_node));
node_list2[i] = calloc( 10000000 , sizeof (list_node));
}
mq1 = create_mq( 4096 );
init_system_time( 10 );
thread_t t1 = create_thread( 0 );
start_run(t1,Routine1,NULL);
thread_t t3 = create_thread( 0 );
start_run(t3,Routine3,NULL);
thread_t t2 = create_thread( 0 );
start_run(t2,Routine2,NULL);
getchar();
return 0 ;
}
因为主要是测试mq的效率,所以预先生成了1亿个消息,分为两个写者一个读者,两个写者循环不断的发消息,每发送1000W休眠一小会.
读者仅仅是从mq中pop一个消息出来,然后更新统计值.在我的i3 2.93双核台式机上运行rhel 6虚拟机,每秒pop出来的消息数量大概在8000W上下。
这个数据足已满足任何高性能的应用需求了.
https://github测试数据/sniperHW/kendylib/blob/master/src/mq.c
分类: 网络程序 , 游戏服务器
作者: Leo_wl
出处: http://HdhCmsTestcnblogs测试数据/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息查看更多关于message queue的设计的详细内容...