好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

自制线程池3

自制线程池3

在自制线程池2中讲到了,实现了线程池中的一些不足之处,

今天我在前面的基础上改进了以下内容,

A。Thread.Suspend和Thread.Resume方法在2.0已经被Obslete,因此改用了其它方法来实现

B。原先新建的线程在空闲没有任务时是处于Suspend状态,如果起动了100个线程,完成任务后还是有100个线程存在,现在加入了策略,当线程空闲60s没有被复用(重新获得任务)则该线程自动销毁。

C。更新了同步的方法,比前面的实现好多了,不过肯定还是存在些问题,同步,竞争读写,线程安全这些内容还是比较糊,在今后的实践中慢慢改进吧

D。使线程池可以实例化,这样可以创建多个专用的线程池,这样的好处是,一些任务对执行的时间没有要求,但是希望占用的资源低,我们可以给该线程池设定最大允许一个线程长时间执行这项工作,而另外的线程池是一些短时间的密集型请求,我们可以设定同时400个线程运行,同时将它的线程空闲注销时间设的短一些,以增强响应的能力。

该版本还存在的问题

A.代码不够清楚,多余的代码仅是注释掉了,并没有删除,这样以后看起来可以给自己一些思路,自己是怎么一步一步改过来的,不知道这个算不算是一个坏习惯,我也想把原先不用的代码删除了,但我怕以后自己都看不懂现在演化过来的代码是什么意思,大家有没有这样的情况

B.各个实例化的线程池之间无法共享复用空闲线程,这个是一个很大的问题,但是我还没想到实现的方法,为了这个问题想了一天多了。

C。再将SmartThreadPool中一些实用的功能补充进来。

另外发现

这个是实现的代码

Code
using  System;
using  System.Collections;
using  System.Collections.Generic;
using  System.Diagnostics;
using  System.Linq;
using  System.Text;
using  System.Threading;
using  Amib.Threading.Internal;
using  Rhino.Commons;

namespace  ThreadPool2
{
     public   delegate   object  WaitCallback2( object  state);
     public   delegate   void  SuccCallback( object  state,  object  result);
     public   delegate   void  ErrCallback( object  state);
     ///   <summary>
     ///  此线程池的作用是将某一类特殊的任务交给此线程池执行,
     ///  可以设定该线程池的最大线程数,
     ///  这类线程池的优点时,占用的资源少,优先级低,
     ///  适合于执行任务需要长期执行,不考虑时间因素的任务
     ///  同时根据在传入线程池时的标记key,可以Aborted指定任务,
     ///  若该任务正在执行或尚在执行队列中
     ///   </summary>
     public   class  MyThreadPool2
    {
         ///   <summary>
         ///  任务执行队列
         ///   </summary>
         // static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
        List < WorkerThread >  queue  =   new  List < WorkerThread > ();
         ///   <summary>
         ///  目前暂定为只使用一个线程,以免耗近资源
         ///   </summary>
        SynchronizedDictionary < string , WorkerThread >  dict  =   new  SynchronizedDictionary < string , WorkerThread > ();
         private   object  state;
        AutoResetEvent wait  =   new  AutoResetEvent( false );
        AutoResetEvent wait2  =   new  AutoResetEvent( false );
         private   int  MaxLimitedTime {  get ;  set ; }
         private   bool  IsLimitedExecTime {  get ;  set ; }
         // private static int _maxThreadNum = 1;
         private   int  MaxThreadNum
        {
             // get { return _maxThreadNum; }
             // set { _maxThreadNum = value; }
             get ;
             set ;
        }
         private  MyThreadPool2()
        {
             // System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
             // SetMaxThreadNum(2);
             // SetMaxExecTime(false, 10000);
        }
         ///   <summary>
         ///  设置专用线程池的初始参数
         ///   </summary>
         ///   <param name="num"> 线程池的最大线程数,最小为1 </param>
         ///   <param name="b"> 是否起用限制最大单个任务执行时间设定 </param>
         ///   <param name="time"> 单个任务执行的最大时间 </param>
         public  MyThreadPool2( int  num,  bool  b,  int  time)
        {
            System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 ,  true );
             if  (num  <   1 )
                num  =   1 ;
            MaxThreadNum  =  num;
            IsLimitedExecTime  =  b;
            MaxLimitedTime  =  time;
             if  (IsLimitedExecTime)
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state,
                                                                        MaxLimitedTime,  true );
        }
         ///   <summary>
         ///  定时将队列中的数据装载到线程中执行,如果还没有到达最大线程数还有任务则创建线程
         ///   </summary>
         ///   <param name="state"></param>
         ///   <param name="timedOut"></param>
         private   void  aa( object  state,  bool  timedOut)
        {
             lock (WorkerThread.Manual){
            WorkerThread.Manual.Reset();
             lock  (queue)
            {
                 // 判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
                List < string >  removeKey  =   new  List < string > ();
                List < WorkerThread >  newTask  =   new  List < WorkerThread > ();
                List < string >  tasks  =   new  List < string > ();
                 // Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
                 foreach  (var kvp  in  dict)
                { // kvp.Value.ThreadState == ThreadState.Unstarted || 
                     // if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)

                     // 将不活动的线程记录下来并移除
                     if  ( ! kvp.Value.Thread.IsAlive)
                        tasks.Add(kvp.Key);
                     // 将活动且空闲的线程赋于新的任务
                     if  (kvp.Value.Thread.IsAlive  ==   true   &&  kvp.Value.CurrentThreadState  ==  WorkerThreadState.Idle)
                    {
                         // dict.Remove(kvp.Key); // cancle because of lock

                        WorkerThread a  =  queue.FirstOrDefault();
                         if  (a  !=   null )
                        {
                            removeKey.Add(kvp.Key);
                             // addDict.Add(a.Key, kvp.Value.Change(a));
                            newTask.Add(kvp.Value.Change(a));
                             // a.Thread = kvp.Value.Thread;
                             // newTask.Add(a);
                            queue.RemoveAt( 0 );
                             // dict.Add(a.Key, kvp.Value.Change(a)); // cancle because of lock
                             // 将参数加到线程中,并改变线程的状态
                             // dict[a.Key].Thread.Resume();
                        }
                         else
                             break ;
                         // else
                         // {
                         //     System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
                         //                                                             2000, true);
                         //     return;
                         // }

                    }
                }
                tasks.ForEach(t  =>  
                { 
                    dict.Remove(t);
                    Debug.WriteLine( " 移除销毁线程对应的dict中的键值项,key= " + t);
                });
                removeKey.ForEach(t  =>  dict.Remove(t));
                newTask.ForEach(t  =>
                {
                    Debug.WriteLine( " 复用线程用于执行新任务 " + t.Key);
                    dict.Add(t.Key, t);
                    t.StartExecTime  =  DateTime.Now;
                    t.Auto.Set();
                     // t.CurrentThreadState = WorkerThreadState.Busy;
                     // t.Thread.Resume();
                });
                 while  (queue.Count  >   0   &&  dict.Count  <  MaxThreadNum)
                {
                     // 未到线程池最大池程数时,增加线程
                    WorkerThread b  =  queue.FirstOrDefault();
                     if  (b  !=   null )
                    {
                        queue.RemoveAt( 0 );
                         // Thread thd = new Thread(new ThreadStart(b.Exec));
                         // thd.Priority = ThreadPriority.Lowest;
                         // dict.Add(b.Key, thd);
                         // thd.Start();
                        WorkerThread wt  =   new  WorkerThread();
                        wt.Start(b);
                        dict.Add(wt.Key, wt);
                        wt.Thread.Start();
                        Debug.WriteLine( " 新建线程用于执行新任务 " +  wt.Key);


                         // 将参数加到线程中,并改变线程的状态
                    }


                }
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 ,
                                                                         true );
            }
            WorkerThread.Manual.Set();
            }
        }
         private   void  SetMaxThreadNum( int  num)
        {
             if  (num  <   1 )
                num  =   1 ;
            MaxThreadNum  =  num;
        }
         private  WorkerThread FindSpecificWorkerThreadByKey( string  key)
        {
            WorkerThread wt;
            dict.TryGetValue(key,  out  wt);
             return  wt;
        }
         ///   <summary>
         ///  设定单线程允许执行任务的最长时间,该方法不能在运行时改变,须事前设定
         ///   </summary>
         ///   <param name="b"></param>
         ///   <param name="time"></param>
        [Obsolete( " abandon " )]
         private   void  SetMaxExecTime( bool  b,  int  time)
        {
            IsLimitedExecTime  =  b;
            MaxLimitedTime  =  time;
             if  (IsLimitedExecTime)
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state,
                                                                        MaxLimitedTime,  true );
        }
         ///   <summary>
         ///  当任务执行超时时,注销该线程
         ///   </summary>
         ///   <param name="state"></param>
         ///   <param name="timedOut"></param>
         private   void  bb( object  state,  bool  timedOut)
        {
             lock  (WorkerThread.Manual)
            {
                WorkerThread.Manual.Reset();
                 // lock (obj)
                 lock  (dict.SyncRoot)
                {
                    List < string >  temp  =   new  List < string > ();
                     foreach  (var kvp  in  dict)
                    {
                         if  (kvp.Value.CurrentThreadState == WorkerThreadState.Busy  && DateTime.Now.Subtract(kvp.Value.StartExecTime).TotalMilliseconds  >  MaxLimitedTime)
                        {
                            temp.Add(kvp.Key);
                        }
                    }
                     foreach  (var s  in  temp)
                    {
                        Debug.WriteLine( " key= " + s + " 的任务超时,执行该任务的线程将被销毁 " );
                        _Aborted(s);
                    }
                    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2,  new  WaitOrTimerCallback(bb), state, MaxLimitedTime,  true );
                }
                WorkerThread.Manual.Set();
            }

        }
         public   void  Aborted( string  key)
        {
             lock  (WorkerThread.Manual)
            {
                WorkerThread.Manual.Reset();
                _Aborted(key);
                WorkerThread.Manual.Set();
            }
        }

         private   void  _Aborted( string  key)
        {
             lock  (queue)
            {
                 // 任务如果还在队列中则删除该任务
                 int  index  =  queue.FindIndex(t  =>  t.Key  ==  key);
                 if  (index  >   - 1 )
                {
                    queue.RemoveAt(index);
                    Debug.WriteLine( " 从任务队列中移除指定key= " + key + " 的任务 " );
                }
            }
             // lock (dict.SyncRoot)
             // {
                 #region  old way now extract method FindSpecificWorkerThreadByKey to split this
                 // WorkerThread v;
                 // if (dict.TryGetValue(key, out v))
                 // {
                 //     v.Thread.Abort();
                 //      // 在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
                 //      // 过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
                 //      // ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
                 //      // 阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
                 //      // 如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
                 //      // 迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
                 //      // 用Join方法。
                 //     v.Thread.Join();
                 //     dict.Remove(key);
                 // }
                 #endregion
                WorkerThread v  =  FindSpecificWorkerThreadByKey(key);
                 // 没有发现指定key的线程表示,对应该key的任务已经执行完了,不需要再来取消该任务
                 // 或者指的key的线程虽然还在但它的状态已变为suspended,任务已完成,将等待下一个任务,实际不需要终止该线程
                 // 只有但指定的key的任务在执行时才删除
                 // if (v != null && v.Thread.ThreadState != ThreadState.Suspended)
                 if  (v  !=   null   &&  v.CurrentThreadState  ==  WorkerThreadState.Busy)
                {
                    dict.Remove(key);
                     /*
                在调用Abort方法时,在指定线程上引发ThreadAbortException。以开始终止此线程的
                过程。ThreadAbortException是—个可以由应用程序代码捕获的特殊异常,但除非调用
                ResetAbort,否则会在catch块的结尾再次引发它。ResetAbod可以取消Abort的请求,并
                阻止ThreadAbortException终止此线程。但是,线程不一定会立即中止,或者根本不中止。
                如果线程在作为中止过程的一部分被调用的finally块中做非常大量的计算,从而无限期延
                迟中止操作,则会发生这种情况。若要确保线程已经终止,请在调用Abort之后对线程调
                用Join方法。
                 */
                    v.Thread.Abort();
                    v.Thread.Join();
                    Debug.WriteLine( " 销毁正在执行Key= " + key + " 的任务的线程 " );
                     if  (v.ErrorCallback  !=   null )
                    {
                        v.ErrorCallback(v.State);
                         // 最后完成任务的时间
                        v.EndExecTime  =  DateTime.Now;
                    }

                }
             // }

             // wait.Set();
        }

         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state,  string  key, SuccCallback succ, ErrCallback err)
        {
            WorkerThread p  =   new  WorkerThread()
            {
                WaitCallback  =  callback,
                State  =  state,
                Key  =  key,
                ErrorCallback  =  err,
                SuccessCallback  =  succ
            };
             // queue.Enqueue(p);
             lock  (queue)
            {
                queue.Add(p);
                 // Monitor.Pulse(queue);
            }
            wait.Set();
        }
         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state, SuccCallback succ, ErrCallback err)
        {
            QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString(), succ, err);
        }

         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state,  string  key)
        {
             // WorkerThread p = new WorkerThread()
             //             {
             //                 WaitCallback = callback,
             //                 State = state,
             //                 Key = key
             //             };
             /// /queue.Enqueue(p);
             // queue.Add(p);
             // wait.Set();
            QueueUserWorkItem(callback, state, key,  null ,  null );
        }
         public   void  QueueUserWorkItem(WaitCallback2 callback,  object  state)
        {
            QueueUserWorkItem(callback, state, System.Guid.NewGuid().ToString());
        }

    }
     public   enum  WorkerThreadState :  byte
    {
        None  =   0 ,
        Busy  =   1 ,
        Idle  =   2
    }
     public   class  WorkerThread
    {
         public  AutoResetEvent Auto  =   new  AutoResetEvent( false );
         public   static  ManualResetEvent Manual  =   new  ManualResetEvent( true );
         public  WorkerThreadState CurrentThreadState {  get ;  set ; }
         public  DateTime StartExecTime {  get ;  set ; }
         public  DateTime EndExecTime {  get ;  set ; }
         public  Thread Thread {  get ;  set ; }
         public   string  Key {  get ;  set ; }
         public  WaitCallback2 WaitCallback {  get ;  set ; }
         public  SuccCallback SuccessCallback {  get ;  set ; }
         public  ErrCallback ErrorCallback {  get ;  set ; }
         public  Object State {  get ;  set ; }
         public   void  Exec()
        {
             while  ( true )
            {
                 this .CurrentThreadState  =  WorkerThreadState.Busy;
                 if  ( this .SuccessCallback  !=   null )
                     this .SuccessCallback( this .State,  this .WaitCallback( this .State));
                 else
                     this .WaitCallback( this .State); // 如何将执行的结果返回,目前是通过SuccessCallback将结果作为参数返回,如果没有使用SuccessCallback将不能返回执行的结果
                 this .EndExecTime  =  DateTime.Now;
                 // 如何将任务执行的起讫时间发给任务
                 this .CurrentThreadState  =  WorkerThreadState.Idle;
                 // this.Thread.Suspend();
                 // 等待60s如果在此期间未接收到新的任务该线程就退出
                 if  ( ! Auto.WaitOne( 60   *   1000 ,  false ))
                     break ;
                Manual.WaitOne();
            }
            Debug.WriteLine( " 线程销毁 " );
        }
         public  WorkerThread Change(WorkerThread wt)
        {
             this .Key  =  wt.Key;
             this .WaitCallback  =  wt.WaitCallback;
             this .State  =  wt.State;
             this .StartExecTime  =  wt.StartExecTime;
             this .ErrorCallback  =  wt.ErrorCallback;
             this .SuccessCallback  =  wt.SuccessCallback;
             return   this ;
        }
         public   void  Start(WorkerThread wt)
        {
             this .Change(wt);
             this .Thread  =   new  Thread( new  ThreadStart( this .Exec));
             this .Thread.Priority  =  ThreadPriority.Lowest;
        }

         // public void Start(WaitCallback callback,Object state)
         // {
         //     this.WaitCallback = callback;
         //     this.State = state;
         //     if(this.Thread==null){
         //         this.Thread = new Thread(new ThreadStart(this.Exec));
         //         this.Thread.Priority = ThreadPriority.Lowest;
         //         this.Thread.IsBackground = true;
         //         this.Thread.Start();
         //         return;
         //     }
         //     if(this.Thread.ThreadState==ThreadState.Suspended)
         //     {
         //         this.Thread.Resume();
         //     }
         // }
    }
}



线程安全的哈希表

Code
using  System.Collections;
using  System.Collections.Generic;

namespace  Amib.Threading.Internal
{
     public   class  SynchronizedDictionary < TKey, TValue >
    {
         private   readonly  Dictionary < TKey, TValue >  _dictionary;
         private   readonly   object  _lock;

         public  SynchronizedDictionary()
        {
            _lock  =   new   object ();
            _dictionary  =   new  Dictionary < TKey, TValue > ();
        }
         public   void  Add(TKey key, TValue value)
        {
             lock  (_lock)
            {
                _dictionary.Add(key, value);
            }
        }
         public   bool  TryGetValue(TKey key, out  TValue value)
        {
             lock (_lock)
            {
                 return   this ._dictionary.TryGetValue(key,  out  value);
            }
        }
         public   int  Count
        {
             get  {  lock  (_lock) {  return  _dictionary.Count; } }
        }

         public   bool  Contains(TKey key)
        {
             lock  (_lock)
            {
                 return  _dictionary.ContainsKey(key);
            }
        }

         public   void  Remove(TKey key)
        {
             lock  (_lock)
            {
                _dictionary.Remove(key);
            }
        }

         public   object  SyncRoot
        {
             get  {  return  _lock; }
        }

         public  TValue  this [TKey key]
        {
             get
            {
                 lock  (_lock)
                {
                     return  _dictionary[key];
                }
            }
             set
            {
                 lock  (_lock)
                {
                    _dictionary[key]  =  value;
                }
            }
        }

         public  Dictionary < TKey, TValue > .KeyCollection Keys
        {
             get
            {
                 lock  (_lock)
                {
                     return  _dictionary.Keys;
                }
            }
        }

         public  Dictionary < TKey, TValue > .ValueCollection Values
        {
             get
            {
                 lock  (_lock)
                {
                     return  _dictionary.Values;
                }
            }
        }
         public   void  Clear()
        {
             lock  (_lock)
            {
                _dictionary.Clear();
            }
        }

         public  Dictionary < TKey, TValue > .Enumerator GetEnumerator()
        {
             lock  (_lock)
            {
                 return   this ._dictionary.GetEnumerator();
            }
        }

    }
}

Code
using  System;
using  System.Collections.Generic;
using  System.Linq;
using  System.Text;
using  System.Threading;
using  ThreadPool2;

namespace  ThreadPoolTest.MyThreadPool2Test
{
     class  Class1
    {
         ///   <summary>
         ///  
         ///   </summary>
         ///   <param name="args"></param>
         static   void  Main( string [] args)
        {
            MyThreadPool2 pool = new  MyThreadPool2( 1 , true , 10000 );
             object  state = new   object ();
            pool.QueueUserWorkItem(call1,state, " task1 " ,succ,err);
            pool.QueueUserWorkItem(call2, state,  " task2 " , succ, err);
            pool.QueueUserWorkItem(call2, state,  " task3 " , succ, err);

            Console.ReadLine();
        }

         private   static   void  err( object  state)
        {
            Console.WriteLine( " err " );
        }

         private   static   void  succ( object  state,  object  result)
        {
            Console.WriteLine( " succ " );
        }

         private   static   object  call1( object  state)
        {
             while ( true )
            {
                Thread.Sleep( 2000 );
                Console.WriteLine( " call1 " );
            }
             return   null ;
        }
         private   static   object  call2( object  state)
        {
            Console.WriteLine( " call2 " );
             return   null ;
        }
    }
}

Code
using  System;
using  System.Collections.Generic;
using  System.Diagnostics;
using  System.Linq;
using  System.Text;
using  ThreadPool2;

namespace  ThreadPoolTest.MyThreadPool2Test
{
     class  Class2
    {
         static   void  Main( string [] args)
        {
            MyThreadPool2 pool = new  MyThreadPool2( 100 , true , 20000 );
             // Stopwatch sw=new Stopwatch();
             for (var i = 0 ;i < 10000 ;i ++ )
                pool.QueueUserWorkItem(call,i,i.ToString(),succ,err);

        }

         private   static   void  err( object  state)
        {
            Console.WriteLine( " process err when value={0} " ,state);
        }

         private   static   void  succ( object  state,  object  result)
        {
            Console.WriteLine( " {0}'s sqrt is {1} " ,state,result);
        }

         private   static   object  call( object  state)
        {
             return  Math.Sqrt(Convert.ToDouble(state));
        }
    }
}

在做测试二是,我截了两张图,并不能说说什么,供参考

我是在Debug模式下测的,内存占用可能会多些,

查看更多关于自制线程池3的详细内容...

  阅读:69次