好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

自制线程池4

自制线程池4

需求:

有一种任务需要定时的执行,而且非常的耗时,因此我把它放到线程池中执行,并设置线程池为1,如果该任务已经在队列中或正在执行该任务,则不要再将该任务加入线程池中了。

测试代码如下

 1   using  System;
 2   using  System.Collections.Generic;
 3   using  System.Linq;
 4   using  System.Text;
 5   using  System.Threading;
 6   using  ThreadPool2;
 7  
 8   namespace  ThreadPoolTest.MyThreadPool2Test
 9   {
10        class  Class6
11       {
12            static   void  Main( string [] args)
13           {
14               MyThreadPool2 pool = new  MyThreadPool2( 1 , true , 30000 );
15                object  obj = new   object ();
16               Random rnd = new  Random();
17                for  (var i  =   0 ; i  <   20 ;i ++  )
18                   pool.QueueUserWorkItem(call, obj, rnd.Next( 1 , 4 ).ToString(), succ, err);
19               Console.ReadLine();
20           }
21  
22            private   static   void  err( object  state)
23           {
24                   Console.WriteLine( " err " );
25           }
26  
27            private   static   void  succ( object  state,  object  result)
28           {
29               Console.WriteLine( " succ " );
30           }
31  
32            private   static   object  call( object  state)
33           {
34                while ( true )
35               {
36                   Thread.Sleep( 2000 );
37                   Console.WriteLine( " exec " );
38               }
39           }
40       }
41   }
42  

线程池代码如下,

using  System;
using  System.Collections;
using  System.Collections.Generic;
using  System.Diagnostics;
using  System.Linq;
using  System.Text;
using  System.Threading;
using  Amib.Threading.Internal;
using  Rhino.Commons;

namespace  ThreadPool2
{
     public   delegate   object  WaitCallback2( object  state);
     public   delegate   void  SuccCallback( object  state,  object  result);
     public   delegate   void  ErrCallback( object  state);
     /**/ ///   <summary>
     ///  此线程池的作用是将某一类特殊的任务交给此线程池执行,
     ///  可以设定该线程池的最大线程数,
     ///  这类线程池的优点时,占用的资源少,优先级低,
     ///  适合于执行任务需要长期执行,不考虑时间因素的任务
     ///  同时根据在传入线程池时的标记key,可以Aborted指定任务,
     ///  若该任务正在执行或尚在执行队列中
     ///   </summary>
     public   class  MyThreadPool2
     {
         /**/ ///   <summary>
         ///  任务执行队列
         ///   </summary>
         // static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
        List < WorkerThread >  queue  =   new  List < WorkerThread > ();
         /**/ ///   <summary>
         ///  目前暂定为只使用一个线程,以免耗近资源
         ///   </summary>
        SynchronizedDictionary < string , WorkerThread >  dict  =   new  SynchronizedDictionary < string , WorkerThread > ();
         private   object  state;
        AutoResetEvent wait  =   new  AutoResetEvent( false );
        AutoResetEvent wait2  =   new  AutoResetEvent( false );
         private   int  MaxLimitedTime  {  get ;  set ; }
         private   bool  IsLimitedExecTime  {  get ;  set ; }
         private   int  IdleTimeout  {  get ;  set ; }
         // private static int _maxThreadNum = 1;
         private   int  MaxThreadNum
         {
             // get { return _maxThreadNum; }
             // set { _maxThreadNum = value; }
             get ;
             set ;
        }
         private  MyThreadPool2()
         {
             // System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
             // SetMaxThreadNum(2);
             // SetMaxExecTime(false, 10000);
        }
         /**/ ///   <summary>
         ///  设置专用线程池的初始参数
         ///   </summary>
         ///   <param name="num"> 线程池的最大线程数,最小为1 </param>
         ///   <param name="b"> 是否起用限制最大单个任务执行时间设定 </param>
         ///   <param name="MaxLimitedTime"> 单个任务执行的最大时间 </param>
         public  MyThreadPool2( int  num,  bool  b,  int  MaxLimitedTime)
         {
            System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 ,  true );
             if  (num  <   1 )
                num  =   1 ;
            MaxThreadNum  =  num;
            IsLimitedExecTime  =  b;
             this .MaxLimitedTime  =  MaxLimitedTime;
             if  (IsLimitedExecTime)
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state,
                                                                         this .MaxLimitedTime,  true );
        }

         /**/ ///   <summary>
         ///  定时将队列中的数据装载到线程中执行,如果还没有到达最大线程数还有任务则创建线程
         ///   </summary>
         ///   <param name="state"></param>
         ///   <param name="timedOut"></param>
         private   void  aa( object  state,  bool  timedOut)
         {
             // Console.WriteLine("执行aa()将队列中的任务加到线程中");
             lock (WorkerThread.Manual) {
            WorkerThread.Manual.Reset();
             lock  (queue)
             {
                Console.WriteLine( " queue count={0} " ,queue.Count);
                 // 判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
                List < string >  removeKey  =   new  List < string > ();
                List < WorkerThread >  newTask  =   new  List < WorkerThread > ();
                List < string >  tasks  =   new  List < string > ();
                 // Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
                 foreach  (var kvp  in  dict)
                 { // kvp.Value.ThreadState == ThreadState.Unstarted || 
                     // if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)

                     // 将不活动的线程记录下来并移除
                     if  ( ! kvp.Value.Thread.IsAlive)
                        tasks.Add(kvp.Key);
                     // 将活动且空闲的线程赋于新的任务
                     if  (kvp.Value.Thread.IsAlive  ==   true   &&  kvp.Value.CurrentThreadState  ==  WorkerThreadState.Idle)
                     {
                         // dict.Remove(kvp.Key); // cancle because of lock

                        WorkerThread a  =  queue.FirstOrDefault();
                         if  (a  !=   null )
                         {
                            removeKey.Add(kvp.Key);
                             // addDict.Add(a.Key, kvp.Value.Change(a));
                            newTask.Add(kvp.Value.Change(a));
                             // a.Thread = kvp.Value.Thread;
                             // newTask.Add(a);
                            queue.RemoveAt( 0 );
                             // dict.Add(a.Key, kvp.Value.Change(a)); // cancle because of lock
                             // 将参数加到线程中,并改变线程的状态
                             // dict[a.Key].Thread.Resume();
                        }
                         else
                             break ;
                         // else
                         // {
                         //     System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
                         //                                                             2000, true);
                         //     return;
                         // }

                    }
                }
                tasks.ForEach(t  =>  
                 { 
                    dict.Remove(t);
                    Debug.WriteLine( " 移除销毁线程对应的dict中的键值项,key= " + t);
                } );
                removeKey.ForEach(t  =>  dict.Remove(t));
                newTask.ForEach(t  =>
                 {
                    Debug.WriteLine( " 复用线程用于执行新任务 " + t.Key);
                    dict.Add(t.Key, t);
                     // t.StartExecTime = DateTime.Now;
                    t.Auto.Set();
                     // t.CurrentThreadState = WorkerThreadState.Busy;
                     // t.Thread.Resume();
                } );
                 while  (queue.Count  >   0   &&  dict.Count  <  MaxThreadNum)
                 {
                     // 未到线程池最大池程数时,增加线程
                    WorkerThread b  =  queue.FirstOrDefault();
                     if  (b  !=   null )
                     {
                        queue.RemoveAt( 0 );
                         // Thread thd = new Thread(new ThreadStart(b.Exec));
                         // thd.Priority = ThreadPriority.Lowest;
                         // dict.Add(b.Key, thd);
                         // thd.Start();
                        WorkerThread wt  =   new  WorkerThread();
                        wt.Start(b);
                        dict.Add(wt.Key, wt);
                        wt.Thread.Start();
                        Debug.WriteLine( " 新建线程用于执行新任务 " +  wt.Key);


                         // 将参数加到线程中,并改变线程的状态
                    }


                }
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 ,
                                                                         true );
            }
            WorkerThread.Manual.Set();
            }
        }
         private   void  SetMaxThreadNum( int  num)
         {
             if  (num  <   1 )
                num  =   1 ;
            MaxThreadNum  =  num;
        }
         private  WorkerThread FindSpecificWorkerThreadByKey( string  key)
         {
            WorkerThread wt;
            dict.TryGetValue(key,  out  wt);
             return  wt;
        }
         /**/ ///   <summary>
         ///  设定单线程允许执行任务的最长时间,该方法不能在运行时改变,须事前设定
         ///   </summary>
         ///   <param name="b"></param>
         ///   <param name="time"></param>
        [Obsolete( " abandon " )]
         private   void  SetMaxExecTime( bool  b,  int  time)
         {
            IsLimitedExecTime  =  b;
            MaxLimitedTime  =  time;
             if  (IsLimitedExecTime)
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state,
                                                                        MaxLimitedTime,  true );
        }
         /**/ ///   <summary>
         ///  当任务执行超时时,注销该线程
         ///   </summary>
         ///   <param name="state"></param>
         ///   <param name="timedOut"></param>
         private   void  bb( object  state,  bool  timedOut)
         {
             // GC.Collect();
            Console.WriteLine( " 执行bb(),检测是否有线程超时 " );
             lock  (WorkerThread.Manual)
             {
                WorkerThread.Manual.Reset();
                 // lock (obj)
                 lock  (dict.SyncRoot)
                 {
                    List < string >  temp  =   new  List < string > ();
                     foreach  (var kvp  in  dict)
                     {
                         if  (kvp.Value.CurrentThreadState == WorkerThreadState.Busy  && DateTime.Now.Subtract(kvp.Value.StartExecTime).TotalMilliseconds  >  MaxLimitedTime)
                         {
                            Console.WriteLine( " now={0} " ,DateTime.Now);
                            Console.WriteLine( " before={0} " ,kvp.Value.StartExecTime);
                            temp.Add(kvp.Key);
                        }
                    }
                     foreach  (var s  in  temp)
                     {
                        Debug.WriteLine( " key= " + s + " 的任务超时,执行该任务的线程将被销毁 " );
                        _Aborted(s);
                    }
                    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state, MaxLimitedTime,  true );
                }
                WorkerThread.Manual.Set();
            }

        }
         public   void  Aborted( string  key)
         {
             lock  (WorkerThread.Manual)
             {
                WorkerThread.Manual.Reset();
                _Aborted(key);
                WorkerThread.Manual.Set();
            }
        }

         private   void  _Aborted( string  key)
         {
             lock  (queue)
             {
                 // 任务如果还在队列中则删除该任务
                 int  index  =  queue.FindIndex(t  =>  t.Key  ==  key);
                 if  (index  >   - 1 )
                 {
                    queue.RemoveAt(index);
                    Debug.WriteLine( " 从任务队列中移除指定key= " + key + " 的任务 " );
                }
            }
             // lock (dict.SyncRoot)
             // {
                 old way now extract method FindSpecificWorkerThreadByKey to split this #region  old way now extract method FindSpecificWorkerThreadByKey to split this
                 // WorkerThread v;
                 // if (dict.TryGetValue(key, out v))
                 // {
                 //     v.Thread.Abort();
                 //      // 在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
                 //      // 过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
                 //      // ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
                 //      // 阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
                 //      // 如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
                 //      // 迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
                 //      // 用Join方法。
                 //     v.Thread.Join();
                 //     dict.Remove(key);
                 // }
                 #endregion
                WorkerThread v  =  FindSpecificWorkerThreadByKey(key);
                 // 没有发现指定key的线程表示,对应该key的任务已经执行完了,不需要再来取消该任务
                 // 或者指的key的线程虽然还在但它的状态已变为suspended,任务已完成,将等待下一个任务,实际不需要终止该线程
                 // 只有但指定的key的任务在执行时才删除
                 // if (v != null && v.Thread.ThreadState != ThreadState.Suspended)
                 if  (v  !=   null   &&  v.CurrentThreadState  ==  WorkerThreadState.Busy)
                 {
                    dict.Remove(key);
                     /**/ /*
                在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
                过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
                ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
                阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
                如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
                迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
                用Join方法。
                 */
                    
                    Debug.WriteLine( " 销毁正在执行Key= " + key + " 的任务的线程 " );
                     // 这里将回调的方法放线程终止之前,是防止在同一线程时,线程终止了,放在后面回调就不执行了
                     if  (v.ErrorCallback  !=   null )
                     {
                        v.ErrorCallback(v.State);
                         // 最后完成任务的时间
                        v.EndExecTime  =  DateTime.Now;
                    }
                    v.Thread.Abort();
                    v.Thread.Join();


                }
             // }

                 // wait.Set();
        }

         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state,  string  key, SuccCallback succ, ErrCallback err)
         {
            WorkerThread.Manual.Reset();
            WorkerThread p  =   new  WorkerThread()
             {
                WaitCallback  =  callback,
                State  =  state,
                Key  =  key,
                ErrorCallback  =  err,
                SuccessCallback  =  succ
            } ;
             // queue.Enqueue(p);
             lock  (queue)
             {
                 if (queue.FindIndex(t => t.Key == p.Key) ==- 1   &&  dict.Contains(p.Key) == false )
                 {
                    queue.Add(p);
                    wait.Set();
                }
                 else
                 {
                    Console.WriteLine( " 由于队列或是正在执行的线程中拥有一个同名的key,此次加入线程的工作将被自动抛弃 " );
                }
                 // Monitor.Pulse(queue);
            }
            WorkerThread.Manual.Set();
        }
         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state, SuccCallback succ, ErrCallback err)
         {
            QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString(), succ, err);
        }

         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state,  string  key)
         {
             // WorkerThread p = new WorkerThread()
             //             {
             //                 WaitCallback = callback,
             //                 State = state,
             //                 Key = key
             //             };
             /**/ /// /queue.Enqueue(p);
             // queue.Add(p);
             // wait.Set();
            QueueUserWorkItem(callback, state, key,  null ,  null );
        }
         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state)
         {
            QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString());
        }

    }
     public   enum  WorkerThreadState :  byte
     {
        None  =   0 ,
        Busy  =   1 ,
        Idle  =   2
    }
     public   class  WorkerThread
     {
         public  AutoResetEvent Auto  =   new  AutoResetEvent( false );
         public   static  ManualResetEvent Manual  =   new  ManualResetEvent( true );
         public  WorkerThreadState CurrentThreadState  {  get ;  set ; }
         public  DateTime StartExecTime  {  get ;  set ; }
         public  DateTime EndExecTime  {  get ;  set ; }
         public  Thread Thread  {  get ;  set ; }
         public   string  Key  {  get ;  set ; }
         public  WaitCallback2 WaitCallback  {  get ;  set ; }
         public  SuccCallback SuccessCallback  {  get ;  set ; }
         public  ErrCallback ErrorCallback  {  get ;  set ; }
         public  Object State  {  get ;  set ; }
         public   void  Exec()
         {
             while  ( true )
             {
                 this .CurrentThreadState  =  WorkerThreadState.Busy;
                 this .StartExecTime  =  DateTime.Now;
                 if  ( this .SuccessCallback  !=   null )
                     this .SuccessCallback( this .State,  this .WaitCallback( this .State));
                 else
                     this .WaitCallback( this .State); // 如何将执行的结果返回,目前是通过SuccessCallback将结果作为参数返回,如果没有使用SuccessCallback将不能返回执行的结果
                 this .EndExecTime  =  DateTime.Now;
                 // 如何将任务执行的起讫时间发给任务
                 this .CurrentThreadState  =  WorkerThreadState.Idle;
                 // this.Thread.Suspend();
                 // 等待60s如果在此期间未接收到新的任务该线程就退出
                 if  ( ! Auto.WaitOne( 60   *   1000 ,  false ))
                     break ;
                Manual.WaitOne();
            }
            Debug.WriteLine( " 线程销毁 " );
        }
         public  WorkerThread Change(WorkerThread wt)
         {
             this .Key  =  wt.Key;
             this .WaitCallback  =  wt.WaitCallback;
             this .State  =  wt.State;
             // this.StartExecTime = wt.StartExecTime;
             this .ErrorCallback  =  wt.ErrorCallback;
             this .SuccessCallback  =  wt.SuccessCallback;
             return   this ;
        }
         public   void  Start(WorkerThread wt)
         {
             this .Change(wt);
             this .Thread  =   new  Thread( new  ThreadStart( this .Exec));
             this .Thread.Priority  =  ThreadPriority.Lowest;
        }

         // public void Start(WaitCallback callback,Object state)
         // {
         //     this.WaitCallback = callback;
         //     this.State = state;
         //     if(this.Thread==null){
         //         this.Thread = new Thread(new ThreadStart(this.Exec));
         //         this.Thread.Priority = ThreadPriority.Lowest;
         //         this.Thread.IsBackground = true;
         //         this.Thread.Start();
         //         return;
         //     }
         //     if(this.Thread.ThreadState==ThreadState.Suspended)
         //     {
         //         this.Thread.Resume();
         //     }
         // }
    }
}



查看更多关于自制线程池4的详细内容...

  阅读:45次

上一篇: 单元测试介绍

下一篇:一切从用户出发