对ServiceStack.Redis的连接池进行故障转移改造
对ServiceStack.Redis的连接池进行故障转移改造
使用ServiceStack.Redis的连接池在操作多台Redis的时候并不会对出现故障的Redis进行排除切换,这样就会导致应用会还是会分配到故障的Redis服务中导致应用处理错误.这次对ServiceStack.Redis连接池的改造主要实现两个功能:1)对 故障的Redis服务在轮循的时候排除,2)定期检测 故障的Redis服务,如果服务正常则恢复到轮盾环节中.( ServiceStack.Redis的代码结构还是很不错修改起来也很方便 )
增加基于Host的连接池功能
ServiceStack.Redis连接池的连接存储结构相对简单,只是用一些简单的数组进行处理也没有明确按Host划分,所以修改起来比较麻烦.通过查看代码决定在RedisEndPoint添加一系列的功能.包括:连接获取,回收,有效性验测等功能.详细代码如下:
public class RedisEndPoint : EndPoint
{
public RedisEndPoint( string host, int port) : base (host, port)
{
}
public RedisEndPoint( string host, int port, string password) : this (host,port)
{
this .Password = password;
}
public string Password { get ; set ; }
public bool RequiresAuth { get { return ! string .IsNullOrEmpty(Password); } }
private System.Collections.Generic.Stack<RedisClient> mStack = new System.Collections.Generic.Stack<RedisClient> ();
private bool mAvailable = true ;
private int mLastDetectTime = 0 ;
private void PingHost( object state)
{
try
{
RedisClient client = Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port);
client.Password = Password;
client.EndPoint = this ;
client.Ping();
Push(client);
mAvailable = true ;
}
catch
{
}
mIsDetecting = false ;
}
private bool mIsDetecting = false ;
public bool Detect()
{
if (! mAvailable)
{
if (System.Math.Abs(System.Environment.TickCount - mLastDetectTime) >= 10000 )
{
mLastDetectTime = System.Environment.TickCount;
if (! mIsDetecting)
{
mIsDetecting = true ;
System.Threading.ThreadPool.QueueUserWorkItem(PingHost);
}
}
}
return mAvailable;
}
public RedisClient Pop()
{
lock (mStack)
{
if (mStack.Count > 0 )
return mStack.Pop();
}
RedisClient client = Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port);
client.EndPoint = this ;
client.Password = Password;
return client;
}
public void Push(RedisClient client)
{
lock (mStack)
{
if (! client.HadExceptions)
{
mStack.Push(client);
}
else
{
client.ClientManager = null ;
client.Dispose();
while (mStack.Count > 0 )
{
client = mStack.Pop();
client.ClientManager = null ;
client.Dispose();
}
mAvailable = false ;
}
}
}
}
比较重要的功能主要是回收和检测,在连接回收的时候判断连接是否存在异常(从代码来看 HadExceptions的设置是由SocketError引发的,因此可以判断当这个值为True的时候存在网络异常 ),如果是则把当前节点标识为不可用,并把池中的所有连接进行清除关闭.检测方法主要是每隔10秒对redis服务进行一个连接和ping操作,如果成功该节点恢复到有效状态.
修改PooledRedisClientManager
为了让新连接池的代码生效,必须修改PooledRedisClientManager几个地方,主要是连接获了和连接回收几个方法的代码.
GetInActiveWriteClient
/// <summary>
/// Called within a lock
/// </summary>
/// <returns></returns>
private RedisClient GetInActiveWriteClient()
{
var desiredIndex = WritePoolIndex % writeClients.Length;
// this will loop through all hosts in readClients once even though there are 2 for loops
// both loops are used to try to get the prefered host according to the round robin algorithm
for ( int x = 0 ; x < ReadWriteHosts.Count; x++ )
{
var nextHostIndex = (desiredIndex + x) % ReadWriteHosts.Count;
var nextHost = ReadWriteHosts[nextHostIndex];
if (nextHost.Detect())
{
RedisClient client = nextHost.Pop();
if (client != null )
{
if (nextHost.RequiresAuth)
client.Password = nextHost.Password;
client.Id = RedisClientCounter++ ;
client.ClientManager = this ;
client.NamespacePrefix = NamespacePrefix;
client.ConnectionFilter = ConnectionFilter;
return client;
}
}
// for (var i = nextHostIndex; i < writeClients.Length; i += ReadWriteHosts.Count)
// {
// if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions)
// return writeClients[i];
// else if (writeClients[i] == null || writeClients[i].HadExceptions)
// {
// if (writeClients[i] != null)
// writeClients[i].DisposeConnection();
// var client = RedisClientFactory.CreateRedisClient(nextHost.Host, nextHost.Port);
// if (nextHost.RequiresAuth)
// client.Password = nextHost.Password;
// client.Id = RedisClientCounter++;
// client.ClientManager = this;
// client.NamespacePrefix = NamespacePrefix;
// client.ConnectionFilter = ConnectionFilter;
// writeClients[i] = client;
// return client;
// }
// }
}
return null ;
}
把代码改成直接检测当明的Host是否有效,如果是则获取连接并返回,这里只修改的writerclient,类里面还有readclient的方法也相对应用进行修改.
GetClient方法代码
/// <summary>
/// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
/// </summary>
/// <returns></returns>
public IRedisClient GetClient()
{
lock (writeClients)
{
AssertValidReadWritePool();
RedisClient inActiveClient;
inActiveClient = GetInActiveWriteClient();
if (inActiveClient == null )
throw new TimeoutException(PoolTimeoutError);
// while ((inActiveClient = GetInActiveWriteClient()) == null)
// {
// if (PoolTimeOut.HasValue)
// {
// // wait for a connection, cry out if made to wait too long
// if (!Monitor.Wait(writeClients, PoolTimeOut.Value))
// throw new TimeoutException(PoolTimeoutError);
// }
// else
// Monitor.Wait(writeClients);
// }
WritePoolIndex ++ ;
inActiveClient.Active = true ;
if ( this .ConnectTimeout != null )
{
inActiveClient.ConnectTimeout = this .ConnectTimeout.Value;
}
if ( this .SocketSendTimeout.HasValue )
{
inActiveClient.SendTimeout = this .SocketSendTimeout.Value;
}
if ( this .SocketReceiveTimeout.HasValue )
{
inActiveClient.ReceiveTimeout = this .SocketReceiveTimeout.Value;
}
inActiveClient.NamespacePrefix = NamespacePrefix;
// Reset database to default if changed
if (inActiveClient.Db != Db)
{
inActiveClient.ChangeDb(Db);
}
return inActiveClient;
}
}
DisposeClient方法代码
public void DisposeClient(RedisNativeClient client)
{
if (client.EndPoint != null )
{
client.EndPoint.Push((RedisClient)client);
return ;
}
// lock (readClients)
// {
// for (var i = 0; i < readClients.Length; i++)
// {
// var readClient = readClients[i];
// if (client != readClient) continue;
// client.Active = false;
// Monitor.PulseAll(readClients);
// return;
// }
// }
通过以上简单的代码修改后ServiceStack.Redis的连接池就具备了故意迁移和恢复的功能:)
可靠、高性能的Socket TCP通讯组件
开源数据库访问组件
作者: Leo_wl
出处: http://HdhCmsTestcnblogs测试数据/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息查看更多关于对ServiceStack.Redis的连接池进行故障转移改造的详细内容...