好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

C#编写高性能网络服务器(源码)

C#编写高性能网络服务器(源码)

C#编写高性能网络服务器(源码)

 

最近有项目要做一个高性能网络服务器,决定下功夫搞定完成端口(IOCP),最终花了一个星期终于把它弄清楚了,并用C++写了一个版本,效率很不错。

但,从项目的总体需求来考虑,最终决定上.net平台,因此又花了一天一夜弄出了一个C#版,在这与大家分享。

一些心得体会:

1、在C#中,不用去面对完成端口的操作系统内核对象,Microsoft已经为我们提供了SocketAsyncEventArgs类,它封装了IOCP的使用。请参考: http://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx?cs-save-lang=1&cs-lang=cpp#code-snippet-1 。

2、我的SocketAsyncEventArgsPool类使用List对象来存储对客户端来通信的SocketAsyncEventArgs对象,它相当于直接使用内核对象时的IoContext。我这样设计比用堆栈来实现的好处理是,我可以在SocketAsyncEventArgsPool池中找到任何一个与服务器连接的客户,主动向它发信息。而用堆栈来实现的话,要主动给客户发信息,则还要设计一个结构来存储已连接上服务器的客户。

3、对每一个客户端不管还发送还是接收,我使用同一个SocketAsyncEventArgs对象,对每一个客户端来说,通信是同步进行的,也就是说服务器高度保证同一个客户连接上要么在投递发送请求,并等待;或者是在投递接收请求,等待中。本例只做echo服务器,还未考虑由服务器主动向客户发送信息。

4、SocketAsyncEventArgs的UserToken被直接设定为被接受的客户端Socket。

5、没有使用BufferManager 类,因为我在初始化时给每一个SocketAsyncEventArgsPool中的对象分配一个缓冲区,发送时使用Arrary.Copy来进行字符拷贝,不去改变缓冲区的位置,只改变使用的长度,因此在下次投递接收请求时恢复缓冲区长度就可以了!如果要主动给客户发信息的话,可以new一个SocketAsyncEventArgs对象,或者在初始化中建立几个来专门用于主动发送信息,因为这种需求一般是进行信息群发,建立一个对象可以用于很多次信息发送,总体来看,这种花销不大,还减去了字符拷贝和消耗。

6、测试结果:(在我的笔记本上时行的,我的本本是T420 I7 8G内存)

100客户 100,000(十万次)不间断的发送接收数据(发送和接收之间没有Sleep,就一个一循环,不断的发送与接收)
耗时3004.6325 秒完成
总共 10,000,000 一千万次访问
平均每分完成 199,691.6 次发送与接收
平均每秒完成 3,328.2 次发送与接收

整个运行过程中,内存消耗在开始两三分种后就保持稳定不再增涨。

看了一下对每个客户端的延迟最多不超过2毫秒,CPU占用在8%左右。

7、下载地址: http://download.csdn.net/detail/ztk12/4928644

8、源码:

IoContextPool.cs

  1   using   System;
   2   using   System.Collections.Generic;
   3   using   System.Text;
   4   using   System.Net.Sockets;
   5  
  6   namespace   IocpServer
   7   {
   8       ///   <summary> 
  9       ///   与每个客户Socket相关联,进行Send和Receive投递时所需要的参数
  10       ///   </summary> 
 11       internal   sealed   class   IoContextPool
  12       {
  13          List<SocketAsyncEventArgs> pool;         //  为每一个Socket客户端分配一个SocketAsyncEventArgs,用一个List管理,在程序启动时建立。 
 14          Int32 capacity;                          //  pool对象池的容量 
 15          Int32 boundary;                          //  已分配和未分配对象的边界,大的是已经分配的,小的是未分配的 
 16  
 17           internal   IoContextPool(Int32 capacity)
  18           {
  19               this .pool =  new  List<SocketAsyncEventArgs> (capacity);
  20               this .boundary =  0  ;
  21               this .capacity =  capacity;
  22           }
  23  
 24           ///   <summary> 
 25           ///   往pool对象池中增加新建立的对象,因为这个程序在启动时会建立好所有对象,
  26           ///   故这个方法只在初始化时会被调用,因此,没有加锁。
  27           ///   </summary> 
 28           ///   <param name="arg"></param> 
 29           ///   <returns></returns> 
 30           internal   bool   Add(SocketAsyncEventArgs arg)
  31           {
  32               if  (arg !=  null  && pool.Count <  capacity)
  33               {
  34                   pool.Add(arg);
  35                  boundary++ ;
  36                   return   true  ;
  37               }
  38               else 
 39                   return   false  ;
  40           }
  41  
 42           ///   <summary> 
 43           ///   取出集合中指定对象,内部使用
  44           ///   </summary> 
 45           ///   <param name="index"></param> 
 46           ///   <returns></returns> 
 47           //  internal SocketAsyncEventArgs Get(int index)
  48           //  {
  49           //      if (index >= 0 && index < capacity)
  50           //          return pool[index];
  51           //      else
  52           //          return null;
  53           //  } 
 54  
 55           ///   <summary> 
 56           ///   从对象池中取出一个对象,交给一个socket来进行投递请求操作
  57           ///   </summary> 
 58           ///   <returns></returns> 
 59           internal   SocketAsyncEventArgs Pop()
  60           {
  61               lock  ( this  .pool)
  62               {
  63                   if  (boundary >  0  )
  64                   {
  65                      -- boundary;
  66                       return   pool[boundary];
  67                   }
  68                   else 
 69                       return   null  ;
  70               }
  71           }
  72  
 73           ///   <summary> 
 74           ///   一个socket客户断开,与其相关的IoContext被释放,重新投入Pool中,备用。
  75           ///   </summary> 
 76           ///   <param name="arg"></param> 
 77           ///   <returns></returns> 
 78           internal   bool   Push(SocketAsyncEventArgs arg)
  79           {
  80               if  (arg !=  null  )
  81               {
  82                   lock  ( this  .pool)
  83                   {
  84                       int  index =  this .pool.IndexOf(arg, boundary);   //  找出被断开的客户,此处一定能查到,因此index不可能为-1,必定要大于0。 
 85                       if  (index == boundary)          //  正好是边界元素 
 86                          boundary++ ;
  87                       else 
 88                       {
  89                           this .pool[index] =  this .pool[boundary];      //  将断开客户移到边界上,边界右移 
 90                           this .pool[boundary++] =  arg;
  91                       }
  92                   }
  93                   return   true  ;
  94               }
  95               else 
 96                   return   false  ;
  97           }
  98       }
  99  }

IoServer.cs

 1   using   System;
    2   using   System.Collections.Generic;
    3   using   System.Text;
    4   using   System.Net.Sockets;
    5   using   System.Threading;
    6   using   System.Net;
    7  
   8   namespace   IocpServer
    9   {
   10       ///   <summary> 
  11       ///   基于SocketAsyncEventArgs 实现 IOCP 服务器
   12       ///   </summary> 
  13       internal   sealed   class   IoServer
   14       {
   15           ///   <summary> 
  16           ///   监听Socket,用于接受客户端的连接请求
   17           ///   </summary> 
  18           private   Socket listenSocket;
   19  
  20           ///   <summary> 
  21           ///   用于服务器执行的互斥同步对象
   22           ///   </summary> 
  23           private   static  Mutex mutex =  new   Mutex();
   24  
  25           ///   <summary> 
  26           ///   用于每个I/O Socket操作的缓冲区大小
   27           ///   </summary> 
  28           private   Int32 bufferSize;
   29  
  30           ///   <summary> 
  31           ///   服务器上连接的客户端总数
   32           ///   </summary> 
  33           private   Int32 numConnectedSockets;
   34  
  35           ///   <summary> 
  36           ///   服务器能接受的最大连接数量
   37           ///   </summary> 
  38           private   Int32 numConnections;
   39  
  40           ///   <summary> 
  41           ///   完成端口上进行投递所用的IoContext对象池
   42           ///   </summary> 
  43           private   IoContextPool ioContextPool;
   44  
  45           public   MainForm mainForm;
   46  
  47           ///   <summary> 
  48           ///   构造函数,建立一个未初始化的服务器实例
   49           ///   </summary> 
  50           ///   <param name="numConnections">  服务器的最大连接数据  </param> 
  51           ///   <param name="bufferSize"></param> 
  52           internal   IoServer(Int32 numConnections, Int32 bufferSize)
   53           {
   54               this .numConnectedSockets =  0  ;
   55               this .numConnections =  numConnections;
   56               this .bufferSize =  bufferSize;
   57  
  58               this .ioContextPool =  new   IoContextPool(numConnections);
   59  
  60               //   为IoContextPool预分配SocketAsyncEventArgs对象 
  61               for  (Int32 i =  0 ; i <  this .numConnections; i++ )
   62               {
   63                  SocketAsyncEventArgs ioContext =  new   SocketAsyncEventArgs();
   64                  ioContext.Completed +=  new  EventHandler<SocketAsyncEventArgs> (OnIOCompleted);
   65                  ioContext.SetBuffer( new  Byte[ this .bufferSize],  0 ,  this  .bufferSize);
   66  
  67                   //   将预分配的对象加入SocketAsyncEventArgs对象池中 
  68                   this  .ioContextPool.Add(ioContext);
   69               }
   70           }
   71  
  72           ///   <summary> 
  73           ///   当Socket上的发送或接收请求被完成时,调用此函数
   74           ///   </summary> 
  75           ///   <param name="sender">  激发事件的对象  </param> 
  76           ///   <param name="e">  与发送或接收完成操作相关联的SocketAsyncEventArg对象  </param> 
  77           private   void  OnIOCompleted( object   sender, SocketAsyncEventArgs e)
   78           {
   79               //   Determine which type of operation just completed and call the associated handler. 
  80               switch   (e.LastOperation)
   81               {
   82                   case   SocketAsyncOperation.Receive:
   83                       this  .ProcessReceive(e);
   84                       break  ;
   85                   case   SocketAsyncOperation.Send:
   86                       this  .ProcessSend(e);
   87                       break  ;
   88                   default  :
   89                       throw   new  ArgumentException( "  The last operation completed on the socket was not a receive or send  "  );
   90               }
   91           }
   92  
  93           ///   <summary> 
  94           ///  接收完成时处理函数
   95           ///   </summary> 
  96           ///   <param name="e">  与接收完成操作相关联的SocketAsyncEventArg对象  </param> 
  97           private   void   ProcessReceive(SocketAsyncEventArgs e)
   98           {
   99               //   检查远程主机是否关闭连接 
 100               if  (e.BytesTransferred >  0  )
  101               {
  102                   if  (e.SocketError ==  SocketError.Success)
  103                   {
  104                      Socket s =  (Socket)e.UserToken;
  105                       //  判断所有需接收的数据是否已经完成 
 106                       if  (s.Available ==  0  )
  107                       {
  108                           //   设置发送数据 
 109                          Array.Copy(e.Buffer,  0  , e.Buffer, e.BytesTransferred, e.BytesTransferred);
  110                          e.SetBuffer(e.Offset, e.BytesTransferred *  2  );
  111                           if  (!s.SendAsync(e))         //  投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件 
 112                           {
  113                               //   同步发送时处理发送完成事件 
 114                               this  .ProcessSend(e);
  115                           }
  116                       }
  117                       else   if  (!s.ReceiveAsync(e))     //  为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件 
 118                       {
  119                           //   同步接收时处理接收完成事件 
 120                           this  .ProcessReceive(e);
  121                       }
  122                   }
  123                   else 
 124                   {
  125                       this  .ProcessError(e);
  126                   }
  127               }
  128               else 
 129               {
  130                   this  .CloseClientSocket(e);
  131               }
  132           }
  133  
 134           ///   <summary> 
 135           ///   发送完成时处理函数
  136           ///   </summary> 
 137           ///   <param name="e">  与发送完成操作相关联的SocketAsyncEventArg对象  </param> 
 138           private   void   ProcessSend(SocketAsyncEventArgs e)
  139           {
  140               if  (e.SocketError ==  SocketError.Success)
  141               {
  142                  Socket s =  (Socket)e.UserToken;
  143  
 144                   //  接收时根据接收的字节数收缩了缓冲区的大小,因此投递接收请求时,恢复缓冲区大小 
 145                  e.SetBuffer( 0  , bufferSize);
  146                   if  (!s.ReceiveAsync(e))      //  投递接收请求 
 147                   {
  148                       //   同步接收时处理接收完成事件 
 149                       this  .ProcessReceive(e);
  150                   }
  151               }
  152               else 
 153               {
  154                   this  .ProcessError(e);
  155               }
  156           }
  157  
 158           ///   <summary> 
 159           ///   处理socket错误
  160           ///   </summary> 
 161           ///   <param name="e"></param> 
 162           private   void   ProcessError(SocketAsyncEventArgs e)
  163           {
  164              Socket s = e.UserToken  as   Socket;
  165              IPEndPoint localEp = s.LocalEndPoint  as   IPEndPoint;
  166  
 167               this  .CloseClientSocket(s, e);
  168  
 169               string  outStr = String.Format( "  套接字错误 {0}, IP {1}, 操作 {2}。  "  , (Int32)e.SocketError, localEp, e.LastOperation);
  170               mainForm.Invoke(mainForm.setlistboxcallback, outStr);
  171               //  Console.WriteLine("Socket error {0} on endpoint {1} during {2}.", (Int32)e.SocketError, localEp, e.LastOperation); 
 172           }
  173  
 174           ///   <summary> 
 175           ///   关闭socket连接
  176           ///   </summary> 
 177           ///   <param name="e">  SocketAsyncEventArg associated with the completed send/receive operation.  </param> 
 178           private   void   CloseClientSocket(SocketAsyncEventArgs e)
  179           {
  180              Socket s = e.UserToken  as   Socket;
  181               this  .CloseClientSocket(s, e);
  182           }
  183  
 184           private   void   CloseClientSocket(Socket s, SocketAsyncEventArgs e)
  185           {
  186              Interlocked.Decrement( ref   this  .numConnectedSockets);
  187  
 188               //   SocketAsyncEventArg 对象被释放,压入可重用队列。 
 189               this  .ioContextPool.Push(e);            
  190               string  outStr = String.Format( "  客户 {0} 断开, 共有 {1} 个连接。  " , s.RemoteEndPoint.ToString(),  this  .numConnectedSockets);
  191               mainForm.Invoke(mainForm.setlistboxcallback, outStr);            
  192               //  Console.WriteLine("A client has been disconnected from the server. There are {0} clients connected to the server", this.numConnectedSockets); 
 193               try 
 194               {
  195                   s.Shutdown(SocketShutdown.Send);
  196               }
  197               catch   (Exception)
  198               {
  199                   //   Throw if client has closed, so it is not necessary to catch. 
 200               }
  201               finally 
 202               {
  203                   s.Close();
  204               }
  205           }
  206  
 207           ///   <summary> 
 208           ///   accept 操作完成时回调函数
  209           ///   </summary> 
 210           ///   <param name="sender">  Object who raised the event.  </param> 
 211           ///   <param name="e">  SocketAsyncEventArg associated with the completed accept operation.  </param> 
 212           private   void  OnAcceptCompleted( object   sender, SocketAsyncEventArgs e)
  213           {
  214               this  .ProcessAccept(e);
  215           }
  216  
 217           ///   <summary> 
 218           ///   监听Socket接受处理
  219           ///   </summary> 
 220           ///   <param name="e">  SocketAsyncEventArg associated with the completed accept operation.  </param> 
 221           private   void   ProcessAccept(SocketAsyncEventArgs e)
  222           {
  223              Socket s =  e.AcceptSocket;
  224               if   (s.Connected)
  225               {
  226                   try 
 227                   {
  228                      SocketAsyncEventArgs ioContext =  this  .ioContextPool.Pop();
  229                       if  (ioContext !=  null  )
  230                       {
  231                           //   从接受的客户端连接中取数据配置ioContext 
 232  
 233                          ioContext.UserToken =  s;
  234  
 235                          Interlocked.Increment( ref   this  .numConnectedSockets);
  236                           string  outStr = String.Format( "  客户 {0} 连入, 共有 {1} 个连接。  " ,  s.RemoteEndPoint.ToString(), this  .numConnectedSockets);
  237                           mainForm.Invoke(mainForm.setlistboxcallback,outStr);
  238                           //  Console.WriteLine("Client connection accepted. There are {0} clients connected to the server",
  239                               //  this.numConnectedSockets); 
 240  
 241                           if  (! s.ReceiveAsync(ioContext))
  242                           {
  243                               this  .ProcessReceive(ioContext);
  244                           }
  245                       }
  246                       else          //  已经达到最大客户连接数量,在这接受连接,发送“连接已经达到最大数”,然后断开连接 
 247                       {
  248                          s.Send(Encoding.Default.GetBytes( "  连接已经达到最大数!  "  ));
  249                           string  outStr = String.Format( "  连接已满,拒绝 {0} 的连接。  "  , s.RemoteEndPoint);
  250                           mainForm.Invoke(mainForm.setlistboxcallback, outStr);
  251                           s.Close();
  252                      }
  253                   }
  254                   catch   (SocketException ex)
  255                   {
  256                      Socket token = e.UserToken  as   Socket;
  257                       string  outStr = String.Format( "  接收客户 {0} 数据出错, 异常信息: {1} 。  "  , token.RemoteEndPoint, ex.ToString());
  258                       mainForm.Invoke(mainForm.setlistboxcallback, outStr);
  259                       //  Console.WriteLine("Error when processing data received from {0}:\r\n{1}", token.RemoteEndPoint, ex.ToString()); 
 260                   }
  261                   catch   (Exception ex)
  262                   {
  263                      mainForm.Invoke(mainForm.setlistboxcallback,  "  异常:  "  +  ex.ToString());
  264                   }
  265                   //   投递下一个接受请求 
 266                   this  .StartAccept(e);
  267               }
  268           }
  269  
 270           ///   <summary> 
 271           ///   从客户端开始接受一个连接操作
  272           ///   </summary> 
 273           ///   <param name="acceptEventArg">  The context object to use when issuing 
  274           ///   the accept operation on the server's listening socket.  </param> 
 275           private   void   StartAccept(SocketAsyncEventArgs acceptEventArg)
  276           {
  277               if  (acceptEventArg ==  null  )
  278               {
  279                  acceptEventArg =  new   SocketAsyncEventArgs();
  280                  acceptEventArg.Completed +=  new  EventHandler<SocketAsyncEventArgs> (OnAcceptCompleted);
  281               }
  282               else 
 283               {
  284                   //   重用前进行对象清理 
 285                  acceptEventArg.AcceptSocket =  null  ;
  286               }
  287  
 288               if  (! this  .listenSocket.AcceptAsync(acceptEventArg))
  289               {
  290                   this  .ProcessAccept(acceptEventArg);
  291               }
  292           }
  293  
 294           ///   <summary> 
 295           ///   启动服务,开始监听
  296           ///   </summary> 
 297           ///   <param name="port">  Port where the server will listen for connection requests.  </param> 
 298           internal   void   Start(Int32 port)
  299           {
  300               //   获得主机相关信息 
 301              IPAddress[] addressList =  Dns.GetHostEntry(Environment.MachineName).AddressList;
  302              IPEndPoint localEndPoint =  new  IPEndPoint(addressList[addressList.Length -  1  ], port);
  303  
 304               //   创建监听socket 
 305               this .listenSocket =  new   Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  306               this .listenSocket.ReceiveBufferSize =  this  .bufferSize;
  307               this .listenSocket.SendBufferSize =  this  .bufferSize;
  308  
 309               if  (localEndPoint.AddressFamily ==  AddressFamily.InterNetworkV6)
  310               {
  311                   //   配置监听socket为 dual-mode (IPv4 & IPv6) 
  312                   //   27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below, 
 313                   this .listenSocket.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName) 27 ,  false  );
  314                   this .listenSocket.Bind( new   IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));
  315               }
  316               else 
 317               {
  318                   this  .listenSocket.Bind(localEndPoint);
  319               }
  320  
 321               //   开始监听 
 322               this .listenSocket.Listen( this  .numConnections);
  323  
 324               //   在监听Socket上投递一个接受请求。 
 325               this .StartAccept( null  );
  326  
 327               //   Blocks the current thread to receive incoming messages. 
 328               mutex.WaitOne();
  329           }
  330  
 331           ///   <summary> 
 332           ///   停止服务
  333           ///   </summary> 
 334           internal   void   Stop()
  335           {
  336               this  .listenSocket.Close();
  337               mutex.ReleaseMutex();
  338           }
  339  
 340       }
  341  }

 

分类:  C# ,  算法

标签:  C# 高性能 源码 编程

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于C#编写高性能网络服务器(源码)的详细内容...

  阅读:43次