好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Socket连接池

Socket连接池

Socket连接池

 

  “池”这个概念好像最早是在操作系统的课上听过的,到后来出来工作的第二天组长也跟我提起“池”这个东东。它给我的感觉是某种对象的集合,如果要用的话就取出,不用的话就放回。在学多线程的时候有接触过线程池,在写《 Socket 一对多通信 》的时候想到了Socket连接池这回事,不过在网上谷歌了一下,发现这类的文章貌似不多,看了一下园友的博文《 高性能Socket设计实现 》,获益良多,下了份源码来看,虽然有一部分看不明白,而且由于个人水平跑不了那份代码,但是从中我学到了不少,至少我写的“池”有一部分是用了这位田大哥的思想。

  先来分析各个类之间的结构,整个连接池里面实际上是有两个池,一个是在异步通信中可以重复利用的SocketAsyncEventArgs池(当然处于别的方面考虑,池里面并不是单纯放SocketAsyncEventArgs的实例集合),另一个是接收数据时用到的byte[]缓冲池。而这两个池是外部不能访问的,外部通过一个控制(或者叫管理)的类进行操作。继承了前两篇博文中的异步通信的思想,最底的一个类存放了一次连接的信息,包括两个SocketAsyncEventArgs实例和通信另一端的Socket实例。下面将逐部分介绍。

  1       ///   <summary> 
  2       ///   连接单元
   3       ///   </summary> 
  4        class   ConnectionUnit:IDisposable
   5       {
   6           private   string  _uid; //  单元的编号,默认为-1 
  7           private   bool  _state; //  单元的状态,true表示使用,false表示空闲 
  8           private  SocketAsyncEventArgs _sendArg; //  专用于发送 
  9           private  SocketAsyncEventArgs _recArg; //  专用于接收 
 10           internal  Socket client {  get ;  set ; } //  客户端的socket实例 
 11           internal  ArrayList tempArray {  get ;  set ; } //  暂存已接收的数据,避免粘包用的 
 12  
 13  
 14           public  ConnectionUnit( string   UID)
  15           {
  16              _uid =  UID;
  17              tempArray =  new   ArrayList();
  18           }
  19  
 20           public  ConnectionUnit() :  this ( "  -1  "  ) { }
  21  
 22           public   void   Dispose()
  23           {
  24               if  (_sendArg !=  null  )
  25                   _sendArg.Dispose();
  26               if  (_recArg !=  null  )
  27                   _recArg.Dispose();
  28  
 29              _sendArg =  null  ;
  30              _recArg =  null  ;
  31           }
  32      }

  这个与之前一篇讲异步通信的博文一样,一个Socket两个SocketAsyncEventArgs。为了取客户端的Socket方便一点,两个SocketAsyncEventArgs一个用于收,一个用于发,田大哥说这样可以实现双工。的确是这样,当初把类定成这样也是为了在收的同时也能发。以上代码删掉了把字段装成属性的那部分。

  SocketAsyncEventArgsPool这个类就是上面链接单元的一个池,这个池才是整个连接池里最核心的池,也是真正意义上的池。池里面用了两个集合来存放这一堆连接单元。分别是空闲栈(freeCollecton)和在线字典集(busyCollection)。对于这样的一个池,我就认为,只要在我需要用的时候把对象取出来,顶多在去的时候给一些参数,让池帮忙把这个对象配置好,我就拿来使用,等到我用完了,把他放回池里,池就把对象还原,等待下次再使用。正体感觉这个池对外有点像个栈,取的时候就Pop,放的时候就Push,此外还提供了两个与栈不相干的的方法,根据编号获取在线的连接单元和获取所有在线连接单元的编号。

  1       class   SocketAsyncEventArgsPool:IDisposable
   2       {
   3           private  Dictionary< string , ConnectionUnit>  busyCollection;
   4           private  Stack<ConnectionUnit>  freeCollecton;
   5  
  6           internal   SocketAsyncEventArgsPool()
   7           {
   8              busyCollection =  new  Dictionary< string , ConnectionUnit> ();
   9              freeCollecton =  new  Stack<ConnectionUnit> ();
  10           }
  11  
 12           ///   <summary> 
 13           ///   取出
  14           ///   </summary> 
 15           internal  ConnectionUnit Pop( string   uid)
  16           {
  17              ConnectionUnit unit =  freeCollecton.Pop();
  18              unit.State =  true  ;
  19              unit.Uid =  uid;
  20               busyCollection.Add(uid, unit);
  21               return   unit;
  22           }
  23  
 24           ///   <summary> 
 25           ///   放回
  26           ///   </summary> 
 27           internal   void   Push(ConnectionUnit unit)
  28           {
  29               if  (! string .IsNullOrEmpty(unit.Uid) && unit.Uid !=  "  -1  "  )
  30                   busyCollection.Remove(unit.Uid);
  31              unit.Uid =  "  -1  "  ;
  32              unit.State =  false  ;
  33               freeCollecton.Push(unit);
  34           }
  35  
 36           ///   <summary> 
 37           ///   获取
  38           ///   </summary> 
 39           internal  ConnectionUnit GetConnectionUnitByUID( string   uid)
  40           {
  41               if   (busyCollection.ContainsKey(uid))
  42                   return   busyCollection[uid];
  43               return   null  ;
  44           }
  45  
 46           ///   <summary> 
 47           ///  
 48           ///   </summary> 
 49           internal   string  [] GetOnLineList()
  50           {
  51               return   busyCollection.Keys.ToArray();
  52           }
  53  
 54           public   void   Dispose()
  55           {
  56               foreach  (KeyValuePair< string ,ConnectionUnit> item  in   busyCollection)
  57                   item.Value.Dispose();
  58  
 59               busyCollection.Clear();
  60  
 61               while  (freeCollecton.Count >  0  )
  62                   freeCollecton.Pop().Dispose();
  63          }

  BufferManager这个是专给接收的SocketAsyncEventArgs用的缓冲池,整一个连接池里面所有接收用的缓冲区都用这个BufferManager,参照田大哥的思想,现在内存里开辟一大片区域存放byte,然后给每一个接收用的SocketAsyncEventArgs分配一块。

  1       class   BufferManager:IDisposable
   2       {
   3           private   byte [] buffers; //  缓冲池 
  4           private   int  bufferSize; //  每个单元使用的大小 
  5           private   int  allSize; //  池的总大小 
  6           private   int  currentIndex; //  当前可用的索引 
  7           private  Stack< int > freeIndexs; //  已使用过的空闲索引 
  8  
  9           ///   <summary> 
 10           ///   构造缓存池
  11           ///   </summary> 
 12           ///   <param name="buffersSize">  池总大小  </param> 
 13           ///   <param name="defaultSize">  默认单元大小  </param> 
 14           internal  BufferManager( int  buffersSize,  int   defaultSize) 
  15           {
  16               this .bufferSize= defaultSize;
  17               this .allSize= buffersSize;
  18              currentIndex= 0  ;
  19               this .buffers =  new   byte  [allSize];
  20              freeIndexs =  new  Stack< int > ();
  21           }
  22  
 23           ///   <summary> 
 24           ///   给SocketAsyncEventArgs设置缓冲区
  25           ///   </summary> 
 26           internal   bool   SetBuffer(SocketAsyncEventArgs e)
  27           {
  28               //  首先看看空闲栈里有没有空闲的区域,有就使用 
 29               if  (freeIndexs.Count >  0  )
  30               {
  31                   e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize);
  32               }
  33               else 
 34               {
  35                   //  没有就得从buffers里取,如果buffers用光了当然取不了 
 36                   if  ((allSize - currentIndex) < bufferSize)  return   false  ;
  37                   e.SetBuffer(buffers, currentIndex, bufferSize);
  38                  currentIndex +=  bufferSize;
  39               }
  40               return   true  ;
  41           }
  42  
 43           ///   <summary> 
 44           ///   释放SocketAsyncEventArgs的缓冲区
  45           ///   </summary> 
 46           ///   <param name="e"></param> 
 47           internal   void   FreeBuffer(SocketAsyncEventArgs e)
  48           {
  49               //  把索引放到空闲索引栈里面,供下次取的时候重复利用 
 50               freeIndexs.Push(e.Offset);
  51               //  同时清空这部分区域的数据免得上次使用时遗留的数据会掺
  52               //  和到下次读取的数据中 
 53               for  ( int  i = e.Offset; i < e.Offset + bufferSize; i++ )
  54               {
  55                   if  (buffers[i] ==  0 )  break  ;
  56                  buffers[i] =  0  ;
  57               }
  58              e.SetBuffer( null ,  0 ,  0  );
  59           }
  60  
 61           public   void   Dispose()
  62           {
  63              buffers =  null  ;
  64              freeIndexs =  null  ;
  65           }
  66      }

  其实上面两个池都是很大程度参照了《 高性能Socket设计实现 》中的内容。下面这个SocketPoolController是对外的类,这个类的设计参照的就没那么多了。而对于一个Socket通信(服务端的)来说,无非都是三件事,接受连接,接收数据,发送数据。这三件事我再操作类里面是这样做的

  接受连接:接受再运行池的时候就开始了,异步循环地接受,执行一次异步接受就阻塞,等到接受完成才被唤醒。

  1           ///   <summary> 
  2           ///   异步Accept客户端的连接
   3           ///   </summary> 
  4           void   MyAsyncAccept()
   5           {
   6               //  这里使用Action的方式异步循环接受客户端的连接
   7               //  模仿同事的做法没开线程,不知这种方式是好是坏 
  8              Action callback =  new  Action( delegate  ()
   9               {
  10                   while  ( true  )
  11                   {
  12                       //  每次接受都要新开一个SocketAsyncEventArgs,否则会报错
  13                       //  其实我也想重复利用的 
 14                      SocketAsyncEventArgs e =  new   SocketAsyncEventArgs();
  15                      e.Completed +=  new  EventHandler<SocketAsyncEventArgs> (Accept_Completed);
  16                      
 17                       acceptLock.Reset();
  18                       server.AcceptAsync(e);
  19                       //  在异步接受完成之前阻塞当前线程 
 20                       acceptLock.WaitOne();
  21                   }
  22               });
  23              callback.BeginInvoke( null ,  null  );
  24           }
  25  
 26           void  Accept_Completed( object   sender, SocketAsyncEventArgs e)
  27           {
  28              Socket client =  e.AcceptSocket;
  29               try 
 30               {
  31                   if   (client.Connected)
  32                   {
  33                      IPEndPoint point = client.RemoteEndPoint  as   IPEndPoint;
  34                       string  uid = point.Address +  "  :  "  +  point.Port;
  35                      ConnectionUnit unit =  pool.Pop(uid);
  36                      unit.client =  client;
  37                      unit.State =  true  ;
  38                      unit.Uid =  uid;
  39                      unit.RecArg.UserToken =  unit;
  40                      unit.SendArg.UserToken =  unit;
  41                       buffer.SetBuffer(unit.RecArg);
  42  
 43                       //  在接受成功之后就开始接收数据了 
 44                       client.ReceiveAsync(unit.RecArg);
  45                       //  设置并发限制信号和增加当前连接数 
 46                       semaphoreAccept.WaitOne();
  47                      Interlocked.Increment( ref   currentConnect);
  48  
 49                       if  (OnAccept !=  null  ) OnAccept(uid);
  50                   }
  51                   else   if  (client !=  null  )
  52                   {
  53                       client.Close();
  54                       client.Dispose();
  55                      client =  null  ;
  56                   }
  57               }
  58               catch   (Exception ex) { Console.WriteLine(ex.ToString()); }
  59               //  设置Accept信号,以便下次Accept的执行 
 60               acceptLock.Set();
  61               e.Dispose();
  62          }

  接收消息:在异步接受成功的时候开始接收,每次接收完成之后就进行下一次接收,直到客户端断开连接才终止。

  1           void  RecArg_Completed( object   sender, SocketAsyncEventArgs e)
   2           {
   3              Socket client = sender  as   Socket;
   4              ConnectionUnit unit = e.UserToken  as   ConnectionUnit;
   5               //  这里大致与上一篇异步通信的一样,只是对缓冲区的处理有一点差异 
  6               if  (e.SocketError ==  SocketError.Success)
   7               {
   8                   int  rec =  e.BytesTransferred;
   9                   if  (rec ==  0  )
  10                   {
  11                       CloseSocket(unit);
  12                       return  ;
  13                   }
  14                   if  (client.Available >  0  )
  15                   {
  16                       unit.tempArray.AddRange(e.Buffer);
  17                       buffer.FreeBuffer(unit.RecArg);
  18                       buffer.SetBuffer(unit.RecArg);
  19                       client.SendAsync(unit.RecArg);
  20                       return  ;
  21                   }
  22                   byte [] data =  e.Buffer;
  23                   int  len =  rec;
  24                   if  (unit.tempArray.Count !=  0  )
  25                   {
  26                       foreach  ( byte  item  in   data)
  27                       {
  28                           if  (item ==  0 )  break  ;
  29                           unit.tempArray.Add(item);
  30                       }
  31                      data = unit.tempArray.ToArray( typeof ( byte ))  as   byte  [];
  32                      rec =  data.Length;
  33                       unit.tempArray.Clear();
  34                   }
  35  
 36                   string  dataStr = Encoding.ASCII.GetString(data,  0  , len);
  37                   if  (OnReceive !=  null  )
  38                       OnReceive(unit.Uid, dataStr);
  39  
 40                   if  (!unit.State)  return  ;
  41                   buffer.FreeBuffer(e);
  42                   buffer.SetBuffer(e);
  43                   client.ReceiveAsync(e);
  44               }
  45               //  这里还多个了一个关闭当前连接 
 46               else 
 47               {
  48                   CloseSocket(unit);
  49               }
  50           }
  51           ///   <summary> 
 52           ///   关闭一个连接单元
  53           ///   </summary> 
 54           private   void   CloseSocket( ConnectionUnit unit )
  55           {
  56               //  关闭并释放客户端socket的字眼 
 57               if  (unit.client !=  null  )
  58               {
  59                   unit.client.Shutdown(SocketShutdown.Both);
  60                   unit.client.Dispose();
  61                  unit.client =  null  ;
  62               }
  63               //  把连接放回连接池 
 64               pool.Push(unit);
  65               //  释放并发信号 
 66               semaphoreAccept.Release();
  67               //  减少当前连接数 
 68              Interlocked.Decrement( ref   currentConnect);
  69          }

  发送消息:外放方法,在需要的时候自行调用方法发送。

  1           ///   <summary> 
  2           ///   发送消息
   3           ///   </summary> 
  4           ///   <param name="uid"></param> 
  5           ///   <param name="message"></param> 
  6           public   void  SendMessage( string  uid,  string   message)
   7           {
   8               sendLock.Reset();
   9              ConnectionUnit unit= pool.GetConnectionUnitByUID(uid);
  10               //  如果获取不了连接单元就不发送了, 
 11               if  (unit ==  null  )
  12               { 
  13                   if (OnSend!= null ) OnSend(uid, "  100  "  );
  14                   sendLock.Set();
  15                   return  ;
  16               }
  17               byte [] datas =  Encoding.ASCII.GetBytes(message);
  18              unit.SendArg.SetBuffer(datas,  0  , datas.Length);
  19               unit.client.SendAsync(unit.SendArg);
  20               //  阻塞当前线程,等到发送完成才释放 
 21               sendLock.WaitOne();
  22           }
  23  
 24           void  SendArg_Completed( object   sender, SocketAsyncEventArgs e)
  25           {
  26              Socket client = sender  as   Socket;
  27              ConnectionUnit unit = e.UserToken  as   ConnectionUnit;
  28               //  这里的消息码有三个,2字头的是成功的,1字头是不成功的
  29               //  101是未知错误,100是客户端不在线 
 30               if  (e.SocketError ==  SocketError.Success)
  31                   if  (OnSend !=  null ) OnSend(unit.Uid,  "  200  "  );
  32               else   if  (OnSend !=  null ) OnSend(unit.Uid,  "  101  "  );
  33               //  释放信号,以便下次发送消息执行 
 34               sendLock.Set();
  35          }

  下面则是类里面的一些字段信息和构造函数

  1           ///   <summary> 
  2           ///   初始化池的互斥体
   3           ///   </summary> 
  4           private  Mutex mutex =  new   Mutex();
   5  
  6           ///   <summary> 
  7           ///   Accept限制信号
   8           ///   </summary> 
  9           private   Semaphore semaphoreAccept;
  10  
 11           ///   <summary> 
 12           ///   Accept信号
  13           ///   </summary> 
 14           private   static  ManualResetEvent acceptLock =  new  ManualResetEvent( false  );
  15  
 16           ///   <summary> 
 17           ///   Send信号
  18           ///   </summary> 
 19           private   static  ManualResetEvent sendLock =  new  ManualResetEvent( false  );
  20  
 21           ///   <summary> 
 22           ///   最大并发数(连接数)
  23           ///   </summary> 
 24           private   int   maxConnect;
  25  
 26           ///   <summary> 
 27           ///   当前连接数(并发数)
  28           ///   </summary> 
 29           private   int   currentConnect;
  30  
 31           ///   <summary> 
 32           ///   缓冲池
  33           ///   </summary> 
 34           private   BufferManager buffer;
  35  
 36           ///   <summary> 
 37           ///   SocketasyncEventArgs池
  38           ///   </summary> 
 39           private   SocketAsyncEventArgsPool pool;
  40  
 41           ///   <summary> 
 42           ///   服务端Socket
  43           ///   </summary> 
 44           private   Socket server;
  45  
 46           ///   <summary> 
 47           ///   完成接受的委托
  48           ///   </summary> 
 49           public   delegate   void  AcceptHandler( string   uid);
  50  
 51           ///   <summary> 
 52           ///   完成发送的委托
  53           ///   </summary> 
 54           public   delegate   void  SendHandler( string  uid,  string   result);
  55  
 56           ///   <summary> 
 57           ///   完成接收的委托
  58           ///   </summary> 
 59           public   delegate   void  RecevieHandler( string  uid,  string   data);
  60  
 61           ///   <summary> 
 62           ///   完成接受事件
  63           ///   </summary> 
 64           public   event   AcceptHandler OnAccept;
  65  
 66           ///   <summary> 
 67           ///   完成发送事件
  68           ///   </summary> 
 69           public   event   SendHandler OnSend;
  70  
 71           ///   <summary> 
 72           ///   完成接收事件
  73           ///   </summary> 
 74           public   event   RecevieHandler OnReceive;
  75  
 76           ///   <summary> 
 77           ///   构造函数
  78           ///   </summary> 
 79           ///   <param name="buffersize">  单元缓冲区大小  </param> 
 80           ///   <param name="maxCount">  并发总数  </param> 
 81           public  SocketPoolController( int  buffersize,  int   maxCount)
  82           {
  83              buffer =  new  BufferManager(buffersize *  maxCount,buffersize);
  84               this .currentConnect =  0  ;
  85               this .maxConnect =  maxCount;
  86               this .currentConnect =  0  ;
  87               this .pool =  new   SocketAsyncEventArgsPool();
  88               //  设置并发数信号,经试验过是并发数-1才对 
 89               this .semaphoreAccept =  new  Semaphore(maxCount- 1 , maxCount- 1  );
  90               InitPool();
  91          }

构造函数里用到的方法

  1           ///   <summary> 
  2           ///   初始化SocketAsyncEventArgs池
   3           ///   这里主要是给空闲栈填充足够的实例
   4           ///   </summary> 
  5           private   void   InitPool()
   6           {
   7              ConnectionUnit unit =  null  ;
   8               for  ( int  i =  0 ; i < maxConnect; i++ )
   9               {
  10                  unit =  new   ConnectionUnit();
  11                  unit.Uid =  "  -1  "  ;
  12                  unit.RecArg =  new   SocketAsyncEventArgs();
  13                  unit.RecArg.Completed +=  new  EventHandler<SocketAsyncEventArgs> (RecArg_Completed);
  14                  unit.SendArg =  new   SocketAsyncEventArgs();
  15                  unit.SendArg.Completed +=  new  EventHandler<SocketAsyncEventArgs> (SendArg_Completed);
  16                   this  .pool.Push(unit);
  17               }
  18          }

  其他外放专门控制池的方法

  1           ///   <summary> 
  2           ///   启动池
   3           ///   </summary> 
  4           ///   <param name="ipAddress">  服务端的IP  </param> 
  5           ///   <param name="port">  端口  </param> 
  6           public   void  RunPool( string  ipAddress,  int   port)
   7           {
   8              IPEndPoint endpoint =  new   IPEndPoint(IPAddress.Parse(ipAddress), port);
   9              server =  new   Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  10               server.Bind(endpoint);
  11              server.Listen( 100  );
  12  
 13               //  调用方法异步Accept客户端的连接 
 14               MyAsyncAccept();
  15               //  设置信号,防止再池在已经启动的情况下再次启动 
 16               mutex.WaitOne();
  17           }
  18  
 19           ///   <summary> 
 20           ///   停止池
  21           ///   </summary> 
 22           public   void   StopPool()
  23           {
  24               //  把服务端的socket关了 
 25               if  (server !=  null  )
  26                   server.Close();
  27               //  释放互斥信号,等待下次启动 
 28               mutex.ReleaseMutex();
  29               //  释放资源 
 30               Dispose();
  31          }

  要把这个操作类与现实的事物类比的话,这个与我们平常跟某个或一堆人聊天时差不多,别人说的东西,不用你自己控制你都会听得到(排除带上耳机,塞住耳朵这种极端的情况),所以接收消息那个方法就不需要了,而你要说什么这个要靠个人控制,而事件那些也好类比,OnAccept就相当于某人加入到这次聊天中你会做些什么(say hello 或者无视),OnRecevie就听到别人说什么自己有什么反应,OnSend就自己说完话之后有什么反应(有时候发现说错了会纠正,有时觉得自己说得好笑的也笑一场诸如此类)。

  在使用的时候可以这样子

  1               SocketPoolController pool;
   2              pool =  new  SocketPoolController( 32  *  1024 ,  1000  );
   3              pool.OnReceive +=  new   SocketPoolController.RecevieHandler(pool_OnReceive);
   4              pool.OnSend +=  new   SocketPoolController.SendHandler(pool_OnSend);
   5              pool.OnAccept +=  new   SocketPoolController.AcceptHandler(pool_OnAccept);
   6              pool.RunPool( "  127.0.0.1  " ,  8081  );
   7              Console.WriteLine( "  Pool has run\r\npress any key to stop...  "  );
   8               Console.ReadKey();
   9               pool.StopPool();
  10              Console.WriteLine( "  Pool has stop\r\npress any key to exit...  "  );
  11              Console.ReadLine();

  在池里面有一部分地方看似跟用同步的差不多,像接受客户端的连接,发送消息这些地方。可是用这种异步,万一客户端突然断开连接也不会有同步那样马上抛异常。还有一点的是在这份代码里面缺少了对异常的捕捉,有一部分错误我在测试的过程中设了判断避开了。以前跟某只猫提过Socket编程会与多线程一起使用,我也觉得是这样,在之前一篇博文 《Socket一对多通信》里我也用到线程,后来的异步通信也是有线程的,不过不是.net framework自行创建的。看了田大哥的博文给我的另一个收获是信号量的使用,在以前不懂得使用信号量,只会设置一大堆标识状态的布尔值或整形的变量来计数判断。田大哥的博文介绍的时高性能的Socket,而我的这个应该性能不会高到哪里去。上面有什么说错的请各位指出,有什么说漏的,请各位提点,多多指导。谢谢!

整份源码

 1       ///   <summary> 
   2       ///   连接单元
    3       ///   </summary> 
   4       class   ConnectionUnit:IDisposable
    5       {
    6           private   string  _uid; //  单元的编号,默认为-1 
   7           private   bool  _state; //  单元的状态,true表示使用,false表示空闲 
   8           private  SocketAsyncEventArgs _sendArg; //  专用于发送 
   9           private  SocketAsyncEventArgs _recArg; //  专用于接收 
  10           internal  Socket client {  get ;  set ; } //  客户端的socket实例 
  11           internal  ArrayList tempArray {  get ;  set ; } //  暂存已接收的数据,避免粘包用的 
  12  
  13           public   string   Uid
   14           {
   15               get  {  return   _uid; }
   16               set  { _uid =  value; }
   17           }
   18  
  19           public  ConnectionUnit( string   UID)
   20           {
   21              _uid =  UID;
   22              tempArray =  new   ArrayList();
   23           }
   24  
  25           public  ConnectionUnit() :  this ( "  -1  "  ) { }
   26  
  27           public   bool   State
   28           {
   29               get  {  return   _state; }
   30               set  { _state =  value; }
   31           }
   32  
  33           public   SocketAsyncEventArgs SendArg
   34           {
   35               get  {  return   _sendArg; }
   36               set  { _sendArg =  value; }
   37           }
   38  
  39           public   SocketAsyncEventArgs RecArg
   40           {
   41               get  {  return   _recArg; }
   42               set  { _recArg =  value; }
   43           }
   44  
  45           public   void   Dispose()
   46           {
   47               if  (_sendArg !=  null  )
   48                   _sendArg.Dispose();
   49               if  (_recArg !=  null  )
   50                   _recArg.Dispose();
   51  
  52              _sendArg =  null  ;
   53              _recArg =  null  ;
   54           }
   55       }
   56  
  57       class   BufferManager:IDisposable
   58       {
   59           private   byte  [] buffers;
   60           private   int   bufferSize;
   61           private   int   allSize;
   62           private   int   currentIndex;
   63           private  Stack< int >  freeIndexs;
   64  
  65           ///   <summary> 
  66           ///   构造缓存池
   67           ///   </summary> 
  68           ///   <param name="buffersSize">  池总大小  </param> 
  69           ///   <param name="defaultSize">  默认单元大小  </param> 
  70           internal  BufferManager( int  buffersSize,  int   defaultSize) 
   71           {
   72               this .bufferSize= defaultSize;
   73               this .allSize= buffersSize;
   74              currentIndex= 0  ;
   75               this .buffers =  new   byte  [allSize];
   76              freeIndexs =  new  Stack< int > ();
   77           }
   78  
  79           ///   <summary> 
  80           ///  
  81           ///   </summary> 
  82           ///   <param name="e"></param> 
  83           ///   <param name="offSet"></param> 
  84           ///   <returns></returns> 
  85           internal   bool   SetBuffer(SocketAsyncEventArgs e)
   86           {
   87               if  (freeIndexs.Count >  0  )
   88               {
   89                   e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize);
   90               }
   91               else 
  92               {
   93                   if  ((allSize - currentIndex) < bufferSize)  return   false  ;
   94                   e.SetBuffer(buffers, currentIndex, bufferSize);
   95                  currentIndex +=  bufferSize;
   96               }
   97               return   true  ;
   98           }
   99  
 100           ///   <summary> 
 101           ///  
 102           ///   </summary> 
 103           ///   <param name="e"></param> 
 104           internal   void   FreeBuffer(SocketAsyncEventArgs e)
  105           {
  106               freeIndexs.Push(e.Offset);
  107               for  ( int  i = e.Offset; i < e.Offset + bufferSize; i++ )
  108               {
  109                   if  (buffers[i] ==  0 )  break  ;
  110                  buffers[i] =  0  ;
  111               }
  112              e.SetBuffer( null ,  0 ,  0  );
  113           }
  114  
 115           public   void   Dispose()
  116           {
  117              buffers =  null  ;
  118              freeIndexs =  null  ;
  119           }
  120       }
  121  
 122       class   SocketAsyncEventArgsPool:IDisposable
  123       {
  124           private  Dictionary< string , ConnectionUnit>  busyCollection;
  125           private  Stack<ConnectionUnit>  freeCollecton;
  126  
 127           internal   SocketAsyncEventArgsPool()
  128           {
  129              busyCollection =  new  Dictionary< string , ConnectionUnit> ();
  130              freeCollecton =  new  Stack<ConnectionUnit> ();
  131           }
  132  
 133           ///   <summary> 
 134           ///   取出
  135           ///   </summary> 
 136           internal  ConnectionUnit Pop( string   uid)
  137           {
  138              ConnectionUnit unit =  freeCollecton.Pop();
  139              unit.State =  true  ;
  140              unit.Uid =  uid;
  141               busyCollection.Add(uid, unit);
  142               return   unit;
  143           }
  144  
 145           ///   <summary> 
 146           ///   放回
  147           ///   </summary> 
 148           internal   void   Push(ConnectionUnit unit)
  149           {
  150               if  (! string .IsNullOrEmpty(unit.Uid) && unit.Uid !=  "  -1  "  )
  151                   busyCollection.Remove(unit.Uid);
  152              unit.Uid =  "  -1  "  ;
  153              unit.State =  false  ;
  154               freeCollecton.Push(unit);
  155           }
  156  
 157           ///   <summary> 
 158           ///   获取
  159           ///   </summary> 
 160           internal  ConnectionUnit GetConnectionUnitByUID( string   uid)
  161           {
  162               if   (busyCollection.ContainsKey(uid))
  163                   return   busyCollection[uid];
  164               return   null  ;
  165           }
  166  
 167           ///   <summary> 
 168           ///  
 169           ///   </summary> 
 170           internal   string  [] GetOnLineList()
  171           {
  172               return   busyCollection.Keys.ToArray();
  173           }
  174  
 175           public   void   Dispose()
  176           {
  177               foreach  (KeyValuePair< string ,ConnectionUnit> item  in   busyCollection)
  178                   item.Value.Dispose();
  179  
 180               busyCollection.Clear();
  181  
 182               while  (freeCollecton.Count >  0  )
  183                   freeCollecton.Pop().Dispose();
  184           }
  185       }
  186  
 187       public   class   SocketPoolController:IDisposable
  188       {
  189           ///   <summary> 
 190           ///   初始化池的互斥体
  191           ///   </summary> 
 192           private  Mutex mutex =  new   Mutex();
  193  
 194           ///   <summary> 
 195           ///   Accept限制信号
  196           ///   </summary> 
 197           private   Semaphore semaphoreAccept;
  198  
 199           ///   <summary> 
 200           ///   Accept信号
  201           ///   </summary> 
 202           private   static  ManualResetEvent acceptLock =  new  ManualResetEvent( false  );
  203  
 204           ///   <summary> 
 205           ///   Send信号
  206           ///   </summary> 
 207           private   static  ManualResetEvent sendLock =  new  ManualResetEvent( false  );
  208  
 209           ///   <summary> 
 210           ///   最大并发数(连接数)
  211           ///   </summary> 
 212           private   int   maxConnect;
  213  
 214           ///   <summary> 
 215           ///   当前连接数(并发数)
  216           ///   </summary> 
 217           private   int   currentConnect;
  218  
 219           ///   <summary> 
 220           ///   缓冲池
  221           ///   </summary> 
 222           private   BufferManager buffer;
  223  
 224           ///   <summary> 
 225           ///   SocketasyncEventArgs池
  226           ///   </summary> 
 227           private   SocketAsyncEventArgsPool pool;
  228  
 229           ///   <summary> 
 230           ///   服务端Socket
  231           ///   </summary> 
 232           private   Socket server;
  233  
 234           ///   <summary> 
 235           ///   完成接受的委托
  236           ///   </summary> 
 237           public   delegate   void  AcceptHandler( string   uid);
  238  
 239           ///   <summary> 
 240           ///   完成发送的委托
  241           ///   </summary> 
 242           public   delegate   void  SendHandler( string  uid,  string   result);
  243  
 244           ///   <summary> 
 245           ///   完成接收的委托
  246           ///   </summary> 
 247           public   delegate   void  RecevieHandler( string  uid,  string   data);
  248  
 249           ///   <summary> 
 250           ///   完成接受事件
  251           ///   </summary> 
 252           public   event   AcceptHandler OnAccept;
  253  
 254           ///   <summary> 
 255           ///   完成发送事件
  256           ///   </summary> 
 257           public   event   SendHandler OnSend;
  258  
 259           ///   <summary> 
 260           ///   完成接收事件
  261           ///   </summary> 
 262           public   event   RecevieHandler OnReceive;
  263  
 264           ///   <summary> 
 265           ///   构造函数
  266           ///   </summary> 
 267           ///   <param name="buffersize">  单元缓冲区大小  </param> 
 268           ///   <param name="maxCount">  并发总数  </param> 
 269           public  SocketPoolController( int  buffersize,  int   maxCount)
  270           {
  271              buffer =  new  BufferManager(buffersize *  maxCount,buffersize);
  272               this .currentConnect =  0  ;
  273               this .maxConnect =  maxCount;
  274               this .currentConnect =  0  ;
  275               this .pool =  new   SocketAsyncEventArgsPool();
  276               //  设置并发数信号,经试验过是并发数-1才对 
 277               this .semaphoreAccept =  new  Semaphore(maxCount- 1 , maxCount- 1  );
  278               InitPool();
  279           }
  280  
 281           ///   <summary> 
 282           ///   初始化SocketAsyncEventArgs池
  283           ///   这里主要是给空闲栈填充足够的实例
  284           ///   </summary> 
 285           private   void   InitPool()
  286           {
  287              ConnectionUnit unit =  null  ;
  288               for  ( int  i =  0 ; i < maxConnect; i++ )
  289               {
  290                  unit =  new   ConnectionUnit();
  291                  unit.Uid =  "  -1  "  ;
  292                  unit.RecArg =  new   SocketAsyncEventArgs();
  293                  unit.RecArg.Completed +=  new  EventHandler<SocketAsyncEventArgs> (RecArg_Completed);
  294                  unit.SendArg =  new   SocketAsyncEventArgs();
  295                  unit.SendArg.Completed +=  new  EventHandler<SocketAsyncEventArgs> (SendArg_Completed);
  296                   this  .pool.Push(unit);
  297               }
  298           }
  299  
 300           ///   <summary> 
 301           ///   启动池
  302           ///   </summary> 
 303           ///   <param name="ipAddress">  服务端的IP  </param> 
 304           ///   <param name="port">  端口  </param> 
 305           public   void  RunPool( string  ipAddress,  int   port)
  306           {
  307              IPEndPoint endpoint =  new   IPEndPoint(IPAddress.Parse(ipAddress), port);
  308              server =  new   Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  309               server.Bind(endpoint);
  310              server.Listen( 100  );
  311  
 312               //  调用方法异步Accept客户端的连接 
 313               MyAsyncAccept();
  314               //  设置信号,防止再池在已经启动的情况下再次启动 
 315               mutex.WaitOne();
  316           }
  317  
 318           ///   <summary> 
 319           ///   异步Accept客户端的连接
  320           ///   </summary> 
 321           void   MyAsyncAccept()
  322           {
  323               //  这里使用Action的方式异步循环接受客户端的连接
  324               //  模仿同事的做法没开线程,不知这种方式是好是坏 
 325              Action callback =  new  Action( delegate  ()
  326               {
  327                   while  ( true  )
  328                   {
  329                       //  每次接受都要新开一个SocketAsyncEventArgs,否则会报错
  330                       //  其实我也想重复利用的 
 331                      SocketAsyncEventArgs e =  new   SocketAsyncEventArgs();
  332                      e.Completed +=  new  EventHandler<SocketAsyncEventArgs> (Accept_Completed);
  333                      
 334                       acceptLock.Reset();
  335                       server.AcceptAsync(e);
  336                       //  在异步接受完成之前阻塞当前线程 
 337                       acceptLock.WaitOne();
  338                   }
  339               });
  340              callback.BeginInvoke( null ,  null  );
  341           }
  342  
 343  
 344           ///   <summary> 
 345           ///   停止池
  346           ///   </summary> 
 347           public   void   StopPool()
  348           {
  349               //  把服务端的socket关了 
 350               if  (server !=  null  )
  351                   server.Close();
  352               //  释放互斥信号,等待下次启动 
 353               mutex.ReleaseMutex();
  354               //  释放资源 
 355               Dispose();
  356           }
  357  
 358           ///   <summary> 
 359           ///   发送消息
  360           ///   </summary> 
 361           ///   <param name="uid"></param> 
 362           ///   <param name="message"></param> 
 363           public   void  SendMessage( string  uid,  string   message)
  364           {
  365               sendLock.Reset();
  366              ConnectionUnit unit= pool.GetConnectionUnitByUID(uid);
  367               //  如果获取不了连接单元就不发送了, 
 368               if  (unit ==  null  )
  369               { 
  370                   if (OnSend!= null ) OnSend(uid, "  100  "  );
  371                   sendLock.Set();
  372                   return  ;
  373               }
  374               byte [] datas =  Encoding.ASCII.GetBytes(message);
  375              unit.SendArg.SetBuffer(datas,  0  , datas.Length);
  376               unit.client.SendAsync(unit.SendArg);
  377               //  阻塞当前线程,等到发送完成才释放 
 378               sendLock.WaitOne();
  379           }
  380  
 381           void  SendArg_Completed( object   sender, SocketAsyncEventArgs e)
  382           {
  383              Socket client = sender  as   Socket;
  384              ConnectionUnit unit = e.UserToken  as   ConnectionUnit;
  385               //  这里的消息码有三个,2字头的是成功的,1字头是不成功的
  386               //  101是未知错误,100是客户端不在线 
 387               if  (e.SocketError ==  SocketError.Success)
  388                   if  (OnSend !=  null ) OnSend(unit.Uid,  "  200  "  );
  389               else   if  (OnSend !=  null ) OnSend(unit.Uid,  "  101  "  );
  390               //  释放信号,以便下次发送消息执行 
 391               sendLock.Set();
  392           }
  393  
 394           void  RecArg_Completed( object   sender, SocketAsyncEventArgs e)
  395           {
  396              Socket client = sender  as   Socket;
  397              ConnectionUnit unit = e.UserToken  as   ConnectionUnit;
  398               //  这里大致与上一篇异步通信的一样,只是对缓冲区的处理有一点差异 
 399               if  (e.SocketError ==  SocketError.Success)
  400               {
  401                   int  rec =  e.BytesTransferred;
  402                   if  (rec ==  0  )
  403                   {
  404                       CloseSocket(unit);
  405                       return  ;
  406                   }
  407                   if  (client.Available >  0  )
  408                   {
  409                       unit.tempArray.AddRange(e.Buffer);
  410                       buffer.FreeBuffer(unit.RecArg);
  411                       buffer.SetBuffer(unit.RecArg);
  412                       client.SendAsync(unit.RecArg);
  413                       return  ;
  414                   }
  415                   byte [] data =  e.Buffer;
  416                   int  len =  rec;
  417                   if  (unit.tempArray.Count !=  0  )
  418                   {
  419                       foreach  ( byte  item  in   data)
  420                       {
  421                           if  (item ==  0 )  break  ;
  422                           unit.tempArray.Add(item);
  423                       }
  424                      data = unit.tempArray.ToArray( typeof ( byte ))  as   byte  [];
  425                      rec =  data.Length;
  426                       unit.tempArray.Clear();
  427                   }
  428  
 429                   string  dataStr = Encoding.ASCII.GetString(data,  0  , len);
  430                   if  (OnReceive !=  null  )
  431                       OnReceive(unit.Uid, dataStr);
  432  
 433                   if  (!unit.State)  return  ;
  434                   buffer.FreeBuffer(e);
  435                   buffer.SetBuffer(e);
  436                   client.ReceiveAsync(e);
  437               }
  438               //  这里还多个了一个关闭当前连接 
 439               else 
 440               {
  441                   CloseSocket(unit);
  442               }
  443           }
  444  
 445           void  Accept_Completed( object   sender, SocketAsyncEventArgs e)
  446           {
  447              Socket client =  e.AcceptSocket;
  448               try 
 449               {
  450                   if   (client.Connected)
  451                   {
  452                      IPEndPoint point = client.RemoteEndPoint  as   IPEndPoint;
  453                       string  uid = point.Address +  "  :  "  +  point.Port;
  454                      ConnectionUnit unit =  pool.Pop(uid);
  455                      unit.client =  client;
  456                      unit.State =  true  ;
  457                      unit.Uid =  uid;
  458                      unit.RecArg.UserToken =  unit;
  459                      unit.SendArg.UserToken =  unit;
  460                       buffer.SetBuffer(unit.RecArg);
  461  
 462                       //  在接受成功之后就开始接收数据了 
 463                       client.ReceiveAsync(unit.RecArg);
  464                       //  设置并发限制信号和增加当前连接数 
 465                       semaphoreAccept.WaitOne();
  466                      Interlocked.Increment( ref   currentConnect);
  467  
 468                       if  (OnAccept !=  null  ) OnAccept(uid);
  469                   }
  470                   else   if  (client !=  null  )
  471                   {
  472                       client.Close();
  473                       client.Dispose();
  474                      client =  null  ;
  475                   }
  476               }
  477               catch   (Exception ex) { Console.WriteLine(ex.ToString()); }
  478               //  设置Accept信号,以便下次Accept的执行 
 479               acceptLock.Set();
  480               e.Dispose();
  481           }
  482  
 483           ///   <summary> 
 484           ///   关闭一个连接单元
  485           ///   </summary> 
 486           private   void   CloseSocket( ConnectionUnit unit )
  487           {
  488               //  关闭并释放客户端socket的字眼 
 489               if  (unit.client !=  null  )
  490               {
  491                   unit.client.Shutdown(SocketShutdown.Both);
  492                   unit.client.Dispose();
  493                  unit.client =  null  ;
  494               }
  495               //  Console.WriteLine(unit.Uid+" disconnect ");
  496               //  把连接放回连接池 
 497               pool.Push(unit);
  498               //  释放并发信号 
 499               semaphoreAccept.Release();
  500               //  减少当前连接数 
 501              Interlocked.Decrement( ref   currentConnect);
  502           }
  503          
 504           public   void   Dispose()
  505           {
  506               if  (pool !=  null  )
  507               {
  508                   pool.Dispose();
  509                  pool =  null  ;
  510               }
  511               if  (buffer !=  null  )
  512               {
  513                   buffer.Dispose();
  514                  buffer =  null  ;
  515               }
  516               if  (server !=  null  )
  517               {
  518                   server.Dispose();
  519                  server =  null  ;
  520               }
  521  
 522           }
  523      }

 

 

分类:  C# ,  Socket编程

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于Socket连接池的详细内容...

  阅读:42次