Socket连接池
Socket连接池
“池”这个概念好像最早是在操作系统的课上听过的,到后来出来工作的第二天组长也跟我提起“池”这个东东。它给我的感觉是某种对象的集合,如果要用的话就取出,不用的话就放回。在学多线程的时候有接触过线程池,在写《 Socket 一对多通信 》的时候想到了Socket连接池这回事,不过在网上谷歌了一下,发现这类的文章貌似不多,看了一下园友的博文《 高性能Socket设计实现 》,获益良多,下了份源码来看,虽然有一部分看不明白,而且由于个人水平跑不了那份代码,但是从中我学到了不少,至少我写的“池”有一部分是用了这位田大哥的思想。
先来分析各个类之间的结构,整个连接池里面实际上是有两个池,一个是在异步通信中可以重复利用的SocketAsyncEventArgs池(当然处于别的方面考虑,池里面并不是单纯放SocketAsyncEventArgs的实例集合),另一个是接收数据时用到的byte[]缓冲池。而这两个池是外部不能访问的,外部通过一个控制(或者叫管理)的类进行操作。继承了前两篇博文中的异步通信的思想,最底的一个类存放了一次连接的信息,包括两个SocketAsyncEventArgs实例和通信另一端的Socket实例。下面将逐部分介绍。
1 /// <summary> 2 /// 连接单元 3 /// </summary> 4 class ConnectionUnit:IDisposable 5 { 6 private string _uid; // 单元的编号,默认为-1 7 private bool _state; // 单元的状态,true表示使用,false表示空闲 8 private SocketAsyncEventArgs _sendArg; // 专用于发送 9 private SocketAsyncEventArgs _recArg; // 专用于接收 10 internal Socket client { get ; set ; } // 客户端的socket实例 11 internal ArrayList tempArray { get ; set ; } // 暂存已接收的数据,避免粘包用的 12 13 14 public ConnectionUnit( string UID) 15 { 16 _uid = UID; 17 tempArray = new ArrayList(); 18 } 19 20 public ConnectionUnit() : this ( " -1 " ) { } 21 22 public void Dispose() 23 { 24 if (_sendArg != null ) 25 _sendArg.Dispose(); 26 if (_recArg != null ) 27 _recArg.Dispose(); 28 29 _sendArg = null ; 30 _recArg = null ; 31 } 32 }
这个与之前一篇讲异步通信的博文一样,一个Socket两个SocketAsyncEventArgs。为了取客户端的Socket方便一点,两个SocketAsyncEventArgs一个用于收,一个用于发,田大哥说这样可以实现双工。的确是这样,当初把类定成这样也是为了在收的同时也能发。以上代码删掉了把字段装成属性的那部分。
SocketAsyncEventArgsPool这个类就是上面链接单元的一个池,这个池才是整个连接池里最核心的池,也是真正意义上的池。池里面用了两个集合来存放这一堆连接单元。分别是空闲栈(freeCollecton)和在线字典集(busyCollection)。对于这样的一个池,我就认为,只要在我需要用的时候把对象取出来,顶多在去的时候给一些参数,让池帮忙把这个对象配置好,我就拿来使用,等到我用完了,把他放回池里,池就把对象还原,等待下次再使用。正体感觉这个池对外有点像个栈,取的时候就Pop,放的时候就Push,此外还提供了两个与栈不相干的的方法,根据编号获取在线的连接单元和获取所有在线连接单元的编号。
1 class SocketAsyncEventArgsPool:IDisposable 2 { 3 private Dictionary< string , ConnectionUnit> busyCollection; 4 private Stack<ConnectionUnit> freeCollecton; 5 6 internal SocketAsyncEventArgsPool() 7 { 8 busyCollection = new Dictionary< string , ConnectionUnit> (); 9 freeCollecton = new Stack<ConnectionUnit> (); 10 } 11 12 /// <summary> 13 /// 取出 14 /// </summary> 15 internal ConnectionUnit Pop( string uid) 16 { 17 ConnectionUnit unit = freeCollecton.Pop(); 18 unit.State = true ; 19 unit.Uid = uid; 20 busyCollection.Add(uid, unit); 21 return unit; 22 } 23 24 /// <summary> 25 /// 放回 26 /// </summary> 27 internal void Push(ConnectionUnit unit) 28 { 29 if (! string .IsNullOrEmpty(unit.Uid) && unit.Uid != " -1 " ) 30 busyCollection.Remove(unit.Uid); 31 unit.Uid = " -1 " ; 32 unit.State = false ; 33 freeCollecton.Push(unit); 34 } 35 36 /// <summary> 37 /// 获取 38 /// </summary> 39 internal ConnectionUnit GetConnectionUnitByUID( string uid) 40 { 41 if (busyCollection.ContainsKey(uid)) 42 return busyCollection[uid]; 43 return null ; 44 } 45 46 /// <summary> 47 /// 48 /// </summary> 49 internal string [] GetOnLineList() 50 { 51 return busyCollection.Keys.ToArray(); 52 } 53 54 public void Dispose() 55 { 56 foreach (KeyValuePair< string ,ConnectionUnit> item in busyCollection) 57 item.Value.Dispose(); 58 59 busyCollection.Clear(); 60 61 while (freeCollecton.Count > 0 ) 62 freeCollecton.Pop().Dispose(); 63 }
BufferManager这个是专给接收的SocketAsyncEventArgs用的缓冲池,整一个连接池里面所有接收用的缓冲区都用这个BufferManager,参照田大哥的思想,现在内存里开辟一大片区域存放byte,然后给每一个接收用的SocketAsyncEventArgs分配一块。
1 class BufferManager:IDisposable 2 { 3 private byte [] buffers; // 缓冲池 4 private int bufferSize; // 每个单元使用的大小 5 private int allSize; // 池的总大小 6 private int currentIndex; // 当前可用的索引 7 private Stack< int > freeIndexs; // 已使用过的空闲索引 8 9 /// <summary> 10 /// 构造缓存池 11 /// </summary> 12 /// <param name="buffersSize"> 池总大小 </param> 13 /// <param name="defaultSize"> 默认单元大小 </param> 14 internal BufferManager( int buffersSize, int defaultSize) 15 { 16 this .bufferSize= defaultSize; 17 this .allSize= buffersSize; 18 currentIndex= 0 ; 19 this .buffers = new byte [allSize]; 20 freeIndexs = new Stack< int > (); 21 } 22 23 /// <summary> 24 /// 给SocketAsyncEventArgs设置缓冲区 25 /// </summary> 26 internal bool SetBuffer(SocketAsyncEventArgs e) 27 { 28 // 首先看看空闲栈里有没有空闲的区域,有就使用 29 if (freeIndexs.Count > 0 ) 30 { 31 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize); 32 } 33 else 34 { 35 // 没有就得从buffers里取,如果buffers用光了当然取不了 36 if ((allSize - currentIndex) < bufferSize) return false ; 37 e.SetBuffer(buffers, currentIndex, bufferSize); 38 currentIndex += bufferSize; 39 } 40 return true ; 41 } 42 43 /// <summary> 44 /// 释放SocketAsyncEventArgs的缓冲区 45 /// </summary> 46 /// <param name="e"></param> 47 internal void FreeBuffer(SocketAsyncEventArgs e) 48 { 49 // 把索引放到空闲索引栈里面,供下次取的时候重复利用 50 freeIndexs.Push(e.Offset); 51 // 同时清空这部分区域的数据免得上次使用时遗留的数据会掺 52 // 和到下次读取的数据中 53 for ( int i = e.Offset; i < e.Offset + bufferSize; i++ ) 54 { 55 if (buffers[i] == 0 ) break ; 56 buffers[i] = 0 ; 57 } 58 e.SetBuffer( null , 0 , 0 ); 59 } 60 61 public void Dispose() 62 { 63 buffers = null ; 64 freeIndexs = null ; 65 } 66 }
其实上面两个池都是很大程度参照了《 高性能Socket设计实现 》中的内容。下面这个SocketPoolController是对外的类,这个类的设计参照的就没那么多了。而对于一个Socket通信(服务端的)来说,无非都是三件事,接受连接,接收数据,发送数据。这三件事我再操作类里面是这样做的
接受连接:接受再运行池的时候就开始了,异步循环地接受,执行一次异步接受就阻塞,等到接受完成才被唤醒。
1 /// <summary> 2 /// 异步Accept客户端的连接 3 /// </summary> 4 void MyAsyncAccept() 5 { 6 // 这里使用Action的方式异步循环接受客户端的连接 7 // 模仿同事的做法没开线程,不知这种方式是好是坏 8 Action callback = new Action( delegate () 9 { 10 while ( true ) 11 { 12 // 每次接受都要新开一个SocketAsyncEventArgs,否则会报错 13 // 其实我也想重复利用的 14 SocketAsyncEventArgs e = new SocketAsyncEventArgs(); 15 e.Completed += new EventHandler<SocketAsyncEventArgs> (Accept_Completed); 16 17 acceptLock.Reset(); 18 server.AcceptAsync(e); 19 // 在异步接受完成之前阻塞当前线程 20 acceptLock.WaitOne(); 21 } 22 }); 23 callback.BeginInvoke( null , null ); 24 } 25 26 void Accept_Completed( object sender, SocketAsyncEventArgs e) 27 { 28 Socket client = e.AcceptSocket; 29 try 30 { 31 if (client.Connected) 32 { 33 IPEndPoint point = client.RemoteEndPoint as IPEndPoint; 34 string uid = point.Address + " : " + point.Port; 35 ConnectionUnit unit = pool.Pop(uid); 36 unit.client = client; 37 unit.State = true ; 38 unit.Uid = uid; 39 unit.RecArg.UserToken = unit; 40 unit.SendArg.UserToken = unit; 41 buffer.SetBuffer(unit.RecArg); 42 43 // 在接受成功之后就开始接收数据了 44 client.ReceiveAsync(unit.RecArg); 45 // 设置并发限制信号和增加当前连接数 46 semaphoreAccept.WaitOne(); 47 Interlocked.Increment( ref currentConnect); 48 49 if (OnAccept != null ) OnAccept(uid); 50 } 51 else if (client != null ) 52 { 53 client.Close(); 54 client.Dispose(); 55 client = null ; 56 } 57 } 58 catch (Exception ex) { Console.WriteLine(ex.ToString()); } 59 // 设置Accept信号,以便下次Accept的执行 60 acceptLock.Set(); 61 e.Dispose(); 62 }
接收消息:在异步接受成功的时候开始接收,每次接收完成之后就进行下一次接收,直到客户端断开连接才终止。
1 void RecArg_Completed( object sender, SocketAsyncEventArgs e) 2 { 3 Socket client = sender as Socket; 4 ConnectionUnit unit = e.UserToken as ConnectionUnit; 5 // 这里大致与上一篇异步通信的一样,只是对缓冲区的处理有一点差异 6 if (e.SocketError == SocketError.Success) 7 { 8 int rec = e.BytesTransferred; 9 if (rec == 0 ) 10 { 11 CloseSocket(unit); 12 return ; 13 } 14 if (client.Available > 0 ) 15 { 16 unit.tempArray.AddRange(e.Buffer); 17 buffer.FreeBuffer(unit.RecArg); 18 buffer.SetBuffer(unit.RecArg); 19 client.SendAsync(unit.RecArg); 20 return ; 21 } 22 byte [] data = e.Buffer; 23 int len = rec; 24 if (unit.tempArray.Count != 0 ) 25 { 26 foreach ( byte item in data) 27 { 28 if (item == 0 ) break ; 29 unit.tempArray.Add(item); 30 } 31 data = unit.tempArray.ToArray( typeof ( byte )) as byte []; 32 rec = data.Length; 33 unit.tempArray.Clear(); 34 } 35 36 string dataStr = Encoding.ASCII.GetString(data, 0 , len); 37 if (OnReceive != null ) 38 OnReceive(unit.Uid, dataStr); 39 40 if (!unit.State) return ; 41 buffer.FreeBuffer(e); 42 buffer.SetBuffer(e); 43 client.ReceiveAsync(e); 44 } 45 // 这里还多个了一个关闭当前连接 46 else 47 { 48 CloseSocket(unit); 49 } 50 } 51 /// <summary> 52 /// 关闭一个连接单元 53 /// </summary> 54 private void CloseSocket( ConnectionUnit unit ) 55 { 56 // 关闭并释放客户端socket的字眼 57 if (unit.client != null ) 58 { 59 unit.client.Shutdown(SocketShutdown.Both); 60 unit.client.Dispose(); 61 unit.client = null ; 62 } 63 // 把连接放回连接池 64 pool.Push(unit); 65 // 释放并发信号 66 semaphoreAccept.Release(); 67 // 减少当前连接数 68 Interlocked.Decrement( ref currentConnect); 69 }
发送消息:外放方法,在需要的时候自行调用方法发送。
1 /// <summary> 2 /// 发送消息 3 /// </summary> 4 /// <param name="uid"></param> 5 /// <param name="message"></param> 6 public void SendMessage( string uid, string message) 7 { 8 sendLock.Reset(); 9 ConnectionUnit unit= pool.GetConnectionUnitByUID(uid); 10 // 如果获取不了连接单元就不发送了, 11 if (unit == null ) 12 { 13 if (OnSend!= null ) OnSend(uid, " 100 " ); 14 sendLock.Set(); 15 return ; 16 } 17 byte [] datas = Encoding.ASCII.GetBytes(message); 18 unit.SendArg.SetBuffer(datas, 0 , datas.Length); 19 unit.client.SendAsync(unit.SendArg); 20 // 阻塞当前线程,等到发送完成才释放 21 sendLock.WaitOne(); 22 } 23 24 void SendArg_Completed( object sender, SocketAsyncEventArgs e) 25 { 26 Socket client = sender as Socket; 27 ConnectionUnit unit = e.UserToken as ConnectionUnit; 28 // 这里的消息码有三个,2字头的是成功的,1字头是不成功的 29 // 101是未知错误,100是客户端不在线 30 if (e.SocketError == SocketError.Success) 31 if (OnSend != null ) OnSend(unit.Uid, " 200 " ); 32 else if (OnSend != null ) OnSend(unit.Uid, " 101 " ); 33 // 释放信号,以便下次发送消息执行 34 sendLock.Set(); 35 }
下面则是类里面的一些字段信息和构造函数
1 /// <summary> 2 /// 初始化池的互斥体 3 /// </summary> 4 private Mutex mutex = new Mutex(); 5 6 /// <summary> 7 /// Accept限制信号 8 /// </summary> 9 private Semaphore semaphoreAccept; 10 11 /// <summary> 12 /// Accept信号 13 /// </summary> 14 private static ManualResetEvent acceptLock = new ManualResetEvent( false ); 15 16 /// <summary> 17 /// Send信号 18 /// </summary> 19 private static ManualResetEvent sendLock = new ManualResetEvent( false ); 20 21 /// <summary> 22 /// 最大并发数(连接数) 23 /// </summary> 24 private int maxConnect; 25 26 /// <summary> 27 /// 当前连接数(并发数) 28 /// </summary> 29 private int currentConnect; 30 31 /// <summary> 32 /// 缓冲池 33 /// </summary> 34 private BufferManager buffer; 35 36 /// <summary> 37 /// SocketasyncEventArgs池 38 /// </summary> 39 private SocketAsyncEventArgsPool pool; 40 41 /// <summary> 42 /// 服务端Socket 43 /// </summary> 44 private Socket server; 45 46 /// <summary> 47 /// 完成接受的委托 48 /// </summary> 49 public delegate void AcceptHandler( string uid); 50 51 /// <summary> 52 /// 完成发送的委托 53 /// </summary> 54 public delegate void SendHandler( string uid, string result); 55 56 /// <summary> 57 /// 完成接收的委托 58 /// </summary> 59 public delegate void RecevieHandler( string uid, string data); 60 61 /// <summary> 62 /// 完成接受事件 63 /// </summary> 64 public event AcceptHandler OnAccept; 65 66 /// <summary> 67 /// 完成发送事件 68 /// </summary> 69 public event SendHandler OnSend; 70 71 /// <summary> 72 /// 完成接收事件 73 /// </summary> 74 public event RecevieHandler OnReceive; 75 76 /// <summary> 77 /// 构造函数 78 /// </summary> 79 /// <param name="buffersize"> 单元缓冲区大小 </param> 80 /// <param name="maxCount"> 并发总数 </param> 81 public SocketPoolController( int buffersize, int maxCount) 82 { 83 buffer = new BufferManager(buffersize * maxCount,buffersize); 84 this .currentConnect = 0 ; 85 this .maxConnect = maxCount; 86 this .currentConnect = 0 ; 87 this .pool = new SocketAsyncEventArgsPool(); 88 // 设置并发数信号,经试验过是并发数-1才对 89 this .semaphoreAccept = new Semaphore(maxCount- 1 , maxCount- 1 ); 90 InitPool(); 91 }
构造函数里用到的方法
1 /// <summary> 2 /// 初始化SocketAsyncEventArgs池 3 /// 这里主要是给空闲栈填充足够的实例 4 /// </summary> 5 private void InitPool() 6 { 7 ConnectionUnit unit = null ; 8 for ( int i = 0 ; i < maxConnect; i++ ) 9 { 10 unit = new ConnectionUnit(); 11 unit.Uid = " -1 " ; 12 unit.RecArg = new SocketAsyncEventArgs(); 13 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs> (RecArg_Completed); 14 unit.SendArg = new SocketAsyncEventArgs(); 15 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs> (SendArg_Completed); 16 this .pool.Push(unit); 17 } 18 }
其他外放专门控制池的方法
1 /// <summary> 2 /// 启动池 3 /// </summary> 4 /// <param name="ipAddress"> 服务端的IP </param> 5 /// <param name="port"> 端口 </param> 6 public void RunPool( string ipAddress, int port) 7 { 8 IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); 9 server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 10 server.Bind(endpoint); 11 server.Listen( 100 ); 12 13 // 调用方法异步Accept客户端的连接 14 MyAsyncAccept(); 15 // 设置信号,防止再池在已经启动的情况下再次启动 16 mutex.WaitOne(); 17 } 18 19 /// <summary> 20 /// 停止池 21 /// </summary> 22 public void StopPool() 23 { 24 // 把服务端的socket关了 25 if (server != null ) 26 server.Close(); 27 // 释放互斥信号,等待下次启动 28 mutex.ReleaseMutex(); 29 // 释放资源 30 Dispose(); 31 }
要把这个操作类与现实的事物类比的话,这个与我们平常跟某个或一堆人聊天时差不多,别人说的东西,不用你自己控制你都会听得到(排除带上耳机,塞住耳朵这种极端的情况),所以接收消息那个方法就不需要了,而你要说什么这个要靠个人控制,而事件那些也好类比,OnAccept就相当于某人加入到这次聊天中你会做些什么(say hello 或者无视),OnRecevie就听到别人说什么自己有什么反应,OnSend就自己说完话之后有什么反应(有时候发现说错了会纠正,有时觉得自己说得好笑的也笑一场诸如此类)。
在使用的时候可以这样子
1 SocketPoolController pool; 2 pool = new SocketPoolController( 32 * 1024 , 1000 ); 3 pool.OnReceive += new SocketPoolController.RecevieHandler(pool_OnReceive); 4 pool.OnSend += new SocketPoolController.SendHandler(pool_OnSend); 5 pool.OnAccept += new SocketPoolController.AcceptHandler(pool_OnAccept); 6 pool.RunPool( " 127.0.0.1 " , 8081 ); 7 Console.WriteLine( " Pool has run\r\npress any key to stop... " ); 8 Console.ReadKey(); 9 pool.StopPool(); 10 Console.WriteLine( " Pool has stop\r\npress any key to exit... " ); 11 Console.ReadLine();
在池里面有一部分地方看似跟用同步的差不多,像接受客户端的连接,发送消息这些地方。可是用这种异步,万一客户端突然断开连接也不会有同步那样马上抛异常。还有一点的是在这份代码里面缺少了对异常的捕捉,有一部分错误我在测试的过程中设了判断避开了。以前跟某只猫提过Socket编程会与多线程一起使用,我也觉得是这样,在之前一篇博文 《Socket一对多通信》里我也用到线程,后来的异步通信也是有线程的,不过不是.net framework自行创建的。看了田大哥的博文给我的另一个收获是信号量的使用,在以前不懂得使用信号量,只会设置一大堆标识状态的布尔值或整形的变量来计数判断。田大哥的博文介绍的时高性能的Socket,而我的这个应该性能不会高到哪里去。上面有什么说错的请各位指出,有什么说漏的,请各位提点,多多指导。谢谢!
整份源码
1 /// <summary> 2 /// 连接单元 3 /// </summary> 4 class ConnectionUnit:IDisposable 5 { 6 private string _uid; // 单元的编号,默认为-1 7 private bool _state; // 单元的状态,true表示使用,false表示空闲 8 private SocketAsyncEventArgs _sendArg; // 专用于发送 9 private SocketAsyncEventArgs _recArg; // 专用于接收 10 internal Socket client { get ; set ; } // 客户端的socket实例 11 internal ArrayList tempArray { get ; set ; } // 暂存已接收的数据,避免粘包用的 12 13 public string Uid 14 { 15 get { return _uid; } 16 set { _uid = value; } 17 } 18 19 public ConnectionUnit( string UID) 20 { 21 _uid = UID; 22 tempArray = new ArrayList(); 23 } 24 25 public ConnectionUnit() : this ( " -1 " ) { } 26 27 public bool State 28 { 29 get { return _state; } 30 set { _state = value; } 31 } 32 33 public SocketAsyncEventArgs SendArg 34 { 35 get { return _sendArg; } 36 set { _sendArg = value; } 37 } 38 39 public SocketAsyncEventArgs RecArg 40 { 41 get { return _recArg; } 42 set { _recArg = value; } 43 } 44 45 public void Dispose() 46 { 47 if (_sendArg != null ) 48 _sendArg.Dispose(); 49 if (_recArg != null ) 50 _recArg.Dispose(); 51 52 _sendArg = null ; 53 _recArg = null ; 54 } 55 } 56 57 class BufferManager:IDisposable 58 { 59 private byte [] buffers; 60 private int bufferSize; 61 private int allSize; 62 private int currentIndex; 63 private Stack< int > freeIndexs; 64 65 /// <summary> 66 /// 构造缓存池 67 /// </summary> 68 /// <param name="buffersSize"> 池总大小 </param> 69 /// <param name="defaultSize"> 默认单元大小 </param> 70 internal BufferManager( int buffersSize, int defaultSize) 71 { 72 this .bufferSize= defaultSize; 73 this .allSize= buffersSize; 74 currentIndex= 0 ; 75 this .buffers = new byte [allSize]; 76 freeIndexs = new Stack< int > (); 77 } 78 79 /// <summary> 80 /// 81 /// </summary> 82 /// <param name="e"></param> 83 /// <param name="offSet"></param> 84 /// <returns></returns> 85 internal bool SetBuffer(SocketAsyncEventArgs e) 86 { 87 if (freeIndexs.Count > 0 ) 88 { 89 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize); 90 } 91 else 92 { 93 if ((allSize - currentIndex) < bufferSize) return false ; 94 e.SetBuffer(buffers, currentIndex, bufferSize); 95 currentIndex += bufferSize; 96 } 97 return true ; 98 } 99 100 /// <summary> 101 /// 102 /// </summary> 103 /// <param name="e"></param> 104 internal void FreeBuffer(SocketAsyncEventArgs e) 105 { 106 freeIndexs.Push(e.Offset); 107 for ( int i = e.Offset; i < e.Offset + bufferSize; i++ ) 108 { 109 if (buffers[i] == 0 ) break ; 110 buffers[i] = 0 ; 111 } 112 e.SetBuffer( null , 0 , 0 ); 113 } 114 115 public void Dispose() 116 { 117 buffers = null ; 118 freeIndexs = null ; 119 } 120 } 121 122 class SocketAsyncEventArgsPool:IDisposable 123 { 124 private Dictionary< string , ConnectionUnit> busyCollection; 125 private Stack<ConnectionUnit> freeCollecton; 126 127 internal SocketAsyncEventArgsPool() 128 { 129 busyCollection = new Dictionary< string , ConnectionUnit> (); 130 freeCollecton = new Stack<ConnectionUnit> (); 131 } 132 133 /// <summary> 134 /// 取出 135 /// </summary> 136 internal ConnectionUnit Pop( string uid) 137 { 138 ConnectionUnit unit = freeCollecton.Pop(); 139 unit.State = true ; 140 unit.Uid = uid; 141 busyCollection.Add(uid, unit); 142 return unit; 143 } 144 145 /// <summary> 146 /// 放回 147 /// </summary> 148 internal void Push(ConnectionUnit unit) 149 { 150 if (! string .IsNullOrEmpty(unit.Uid) && unit.Uid != " -1 " ) 151 busyCollection.Remove(unit.Uid); 152 unit.Uid = " -1 " ; 153 unit.State = false ; 154 freeCollecton.Push(unit); 155 } 156 157 /// <summary> 158 /// 获取 159 /// </summary> 160 internal ConnectionUnit GetConnectionUnitByUID( string uid) 161 { 162 if (busyCollection.ContainsKey(uid)) 163 return busyCollection[uid]; 164 return null ; 165 } 166 167 /// <summary> 168 /// 169 /// </summary> 170 internal string [] GetOnLineList() 171 { 172 return busyCollection.Keys.ToArray(); 173 } 174 175 public void Dispose() 176 { 177 foreach (KeyValuePair< string ,ConnectionUnit> item in busyCollection) 178 item.Value.Dispose(); 179 180 busyCollection.Clear(); 181 182 while (freeCollecton.Count > 0 ) 183 freeCollecton.Pop().Dispose(); 184 } 185 } 186 187 public class SocketPoolController:IDisposable 188 { 189 /// <summary> 190 /// 初始化池的互斥体 191 /// </summary> 192 private Mutex mutex = new Mutex(); 193 194 /// <summary> 195 /// Accept限制信号 196 /// </summary> 197 private Semaphore semaphoreAccept; 198 199 /// <summary> 200 /// Accept信号 201 /// </summary> 202 private static ManualResetEvent acceptLock = new ManualResetEvent( false ); 203 204 /// <summary> 205 /// Send信号 206 /// </summary> 207 private static ManualResetEvent sendLock = new ManualResetEvent( false ); 208 209 /// <summary> 210 /// 最大并发数(连接数) 211 /// </summary> 212 private int maxConnect; 213 214 /// <summary> 215 /// 当前连接数(并发数) 216 /// </summary> 217 private int currentConnect; 218 219 /// <summary> 220 /// 缓冲池 221 /// </summary> 222 private BufferManager buffer; 223 224 /// <summary> 225 /// SocketasyncEventArgs池 226 /// </summary> 227 private SocketAsyncEventArgsPool pool; 228 229 /// <summary> 230 /// 服务端Socket 231 /// </summary> 232 private Socket server; 233 234 /// <summary> 235 /// 完成接受的委托 236 /// </summary> 237 public delegate void AcceptHandler( string uid); 238 239 /// <summary> 240 /// 完成发送的委托 241 /// </summary> 242 public delegate void SendHandler( string uid, string result); 243 244 /// <summary> 245 /// 完成接收的委托 246 /// </summary> 247 public delegate void RecevieHandler( string uid, string data); 248 249 /// <summary> 250 /// 完成接受事件 251 /// </summary> 252 public event AcceptHandler OnAccept; 253 254 /// <summary> 255 /// 完成发送事件 256 /// </summary> 257 public event SendHandler OnSend; 258 259 /// <summary> 260 /// 完成接收事件 261 /// </summary> 262 public event RecevieHandler OnReceive; 263 264 /// <summary> 265 /// 构造函数 266 /// </summary> 267 /// <param name="buffersize"> 单元缓冲区大小 </param> 268 /// <param name="maxCount"> 并发总数 </param> 269 public SocketPoolController( int buffersize, int maxCount) 270 { 271 buffer = new BufferManager(buffersize * maxCount,buffersize); 272 this .currentConnect = 0 ; 273 this .maxConnect = maxCount; 274 this .currentConnect = 0 ; 275 this .pool = new SocketAsyncEventArgsPool(); 276 // 设置并发数信号,经试验过是并发数-1才对 277 this .semaphoreAccept = new Semaphore(maxCount- 1 , maxCount- 1 ); 278 InitPool(); 279 } 280 281 /// <summary> 282 /// 初始化SocketAsyncEventArgs池 283 /// 这里主要是给空闲栈填充足够的实例 284 /// </summary> 285 private void InitPool() 286 { 287 ConnectionUnit unit = null ; 288 for ( int i = 0 ; i < maxConnect; i++ ) 289 { 290 unit = new ConnectionUnit(); 291 unit.Uid = " -1 " ; 292 unit.RecArg = new SocketAsyncEventArgs(); 293 unit.RecArg.Completed += new EventHandler<SocketAsyncEventArgs> (RecArg_Completed); 294 unit.SendArg = new SocketAsyncEventArgs(); 295 unit.SendArg.Completed += new EventHandler<SocketAsyncEventArgs> (SendArg_Completed); 296 this .pool.Push(unit); 297 } 298 } 299 300 /// <summary> 301 /// 启动池 302 /// </summary> 303 /// <param name="ipAddress"> 服务端的IP </param> 304 /// <param name="port"> 端口 </param> 305 public void RunPool( string ipAddress, int port) 306 { 307 IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); 308 server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 309 server.Bind(endpoint); 310 server.Listen( 100 ); 311 312 // 调用方法异步Accept客户端的连接 313 MyAsyncAccept(); 314 // 设置信号,防止再池在已经启动的情况下再次启动 315 mutex.WaitOne(); 316 } 317 318 /// <summary> 319 /// 异步Accept客户端的连接 320 /// </summary> 321 void MyAsyncAccept() 322 { 323 // 这里使用Action的方式异步循环接受客户端的连接 324 // 模仿同事的做法没开线程,不知这种方式是好是坏 325 Action callback = new Action( delegate () 326 { 327 while ( true ) 328 { 329 // 每次接受都要新开一个SocketAsyncEventArgs,否则会报错 330 // 其实我也想重复利用的 331 SocketAsyncEventArgs e = new SocketAsyncEventArgs(); 332 e.Completed += new EventHandler<SocketAsyncEventArgs> (Accept_Completed); 333 334 acceptLock.Reset(); 335 server.AcceptAsync(e); 336 // 在异步接受完成之前阻塞当前线程 337 acceptLock.WaitOne(); 338 } 339 }); 340 callback.BeginInvoke( null , null ); 341 } 342 343 344 /// <summary> 345 /// 停止池 346 /// </summary> 347 public void StopPool() 348 { 349 // 把服务端的socket关了 350 if (server != null ) 351 server.Close(); 352 // 释放互斥信号,等待下次启动 353 mutex.ReleaseMutex(); 354 // 释放资源 355 Dispose(); 356 } 357 358 /// <summary> 359 /// 发送消息 360 /// </summary> 361 /// <param name="uid"></param> 362 /// <param name="message"></param> 363 public void SendMessage( string uid, string message) 364 { 365 sendLock.Reset(); 366 ConnectionUnit unit= pool.GetConnectionUnitByUID(uid); 367 // 如果获取不了连接单元就不发送了, 368 if (unit == null ) 369 { 370 if (OnSend!= null ) OnSend(uid, " 100 " ); 371 sendLock.Set(); 372 return ; 373 } 374 byte [] datas = Encoding.ASCII.GetBytes(message); 375 unit.SendArg.SetBuffer(datas, 0 , datas.Length); 376 unit.client.SendAsync(unit.SendArg); 377 // 阻塞当前线程,等到发送完成才释放 378 sendLock.WaitOne(); 379 } 380 381 void SendArg_Completed( object sender, SocketAsyncEventArgs e) 382 { 383 Socket client = sender as Socket; 384 ConnectionUnit unit = e.UserToken as ConnectionUnit; 385 // 这里的消息码有三个,2字头的是成功的,1字头是不成功的 386 // 101是未知错误,100是客户端不在线 387 if (e.SocketError == SocketError.Success) 388 if (OnSend != null ) OnSend(unit.Uid, " 200 " ); 389 else if (OnSend != null ) OnSend(unit.Uid, " 101 " ); 390 // 释放信号,以便下次发送消息执行 391 sendLock.Set(); 392 } 393 394 void RecArg_Completed( object sender, SocketAsyncEventArgs e) 395 { 396 Socket client = sender as Socket; 397 ConnectionUnit unit = e.UserToken as ConnectionUnit; 398 // 这里大致与上一篇异步通信的一样,只是对缓冲区的处理有一点差异 399 if (e.SocketError == SocketError.Success) 400 { 401 int rec = e.BytesTransferred; 402 if (rec == 0 ) 403 { 404 CloseSocket(unit); 405 return ; 406 } 407 if (client.Available > 0 ) 408 { 409 unit.tempArray.AddRange(e.Buffer); 410 buffer.FreeBuffer(unit.RecArg); 411 buffer.SetBuffer(unit.RecArg); 412 client.SendAsync(unit.RecArg); 413 return ; 414 } 415 byte [] data = e.Buffer; 416 int len = rec; 417 if (unit.tempArray.Count != 0 ) 418 { 419 foreach ( byte item in data) 420 { 421 if (item == 0 ) break ; 422 unit.tempArray.Add(item); 423 } 424 data = unit.tempArray.ToArray( typeof ( byte )) as byte []; 425 rec = data.Length; 426 unit.tempArray.Clear(); 427 } 428 429 string dataStr = Encoding.ASCII.GetString(data, 0 , len); 430 if (OnReceive != null ) 431 OnReceive(unit.Uid, dataStr); 432 433 if (!unit.State) return ; 434 buffer.FreeBuffer(e); 435 buffer.SetBuffer(e); 436 client.ReceiveAsync(e); 437 } 438 // 这里还多个了一个关闭当前连接 439 else 440 { 441 CloseSocket(unit); 442 } 443 } 444 445 void Accept_Completed( object sender, SocketAsyncEventArgs e) 446 { 447 Socket client = e.AcceptSocket; 448 try 449 { 450 if (client.Connected) 451 { 452 IPEndPoint point = client.RemoteEndPoint as IPEndPoint; 453 string uid = point.Address + " : " + point.Port; 454 ConnectionUnit unit = pool.Pop(uid); 455 unit.client = client; 456 unit.State = true ; 457 unit.Uid = uid; 458 unit.RecArg.UserToken = unit; 459 unit.SendArg.UserToken = unit; 460 buffer.SetBuffer(unit.RecArg); 461 462 // 在接受成功之后就开始接收数据了 463 client.ReceiveAsync(unit.RecArg); 464 // 设置并发限制信号和增加当前连接数 465 semaphoreAccept.WaitOne(); 466 Interlocked.Increment( ref currentConnect); 467 468 if (OnAccept != null ) OnAccept(uid); 469 } 470 else if (client != null ) 471 { 472 client.Close(); 473 client.Dispose(); 474 client = null ; 475 } 476 } 477 catch (Exception ex) { Console.WriteLine(ex.ToString()); } 478 // 设置Accept信号,以便下次Accept的执行 479 acceptLock.Set(); 480 e.Dispose(); 481 } 482 483 /// <summary> 484 /// 关闭一个连接单元 485 /// </summary> 486 private void CloseSocket( ConnectionUnit unit ) 487 { 488 // 关闭并释放客户端socket的字眼 489 if (unit.client != null ) 490 { 491 unit.client.Shutdown(SocketShutdown.Both); 492 unit.client.Dispose(); 493 unit.client = null ; 494 } 495 // Console.WriteLine(unit.Uid+" disconnect "); 496 // 把连接放回连接池 497 pool.Push(unit); 498 // 释放并发信号 499 semaphoreAccept.Release(); 500 // 减少当前连接数 501 Interlocked.Decrement( ref currentConnect); 502 } 503 504 public void Dispose() 505 { 506 if (pool != null ) 507 { 508 pool.Dispose(); 509 pool = null ; 510 } 511 if (buffer != null ) 512 { 513 buffer.Dispose(); 514 buffer = null ; 515 } 516 if (server != null ) 517 { 518 server.Dispose(); 519 server = null ; 520 } 521 522 } 523 }
分类: C# , Socket编程
作者: Leo_wl
出处: http://www.cnblogs.com/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息