好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

自已实现线程池

自已实现线程池

.net内置的threadpool对于加入执行队列的任务,或是正在执行的任务无法取消,这对于我的项目来说有问题,因此要自定义一个线程池。

我的项目中具体的应用情节如下,某一个操作会非常耗时(将网址插入bdb中),如果将其加入线程池中,很可能将线程池中的资源耗尽,因此我希望我可以定义一个maxThreadNum用来控制执行此在操作最大允许同时执行的线程数,同时设定线程等级为最低ThreadPriority.Lowest,我只要这个操作慢慢执行就行了。

代码还需要重构,高手请指点下,对锁的机制还是不太清楚,

Code
using  System;
using  System.Collections.Generic;
using  System.Linq;
using  System.Text;
using  System.Threading;
using  Rhino.Commons;

namespace  ThreadPool
{
     public   static   class  MyThreadPool
    {
         static   object  obj  =   new   object ();
         static  AutoResetEvent wait  =   new  AutoResetEvent( false );
         static  MyThreadPool()
        {
            System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 , true );
            SetMaxThreadNum( 1 );
        }

         private   static   void  aa( object  state,  bool  timedOut)
        {
             lock  (obj)
            {
                 // 判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
                List < string >  removeKey  =   new  List < string > ();
                List < WorkerThread >  newTask = new  List < WorkerThread > ();
                 // Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
                 foreach  (var kvp  in  dict)
                { // kvp.Value.ThreadState == ThreadState.Unstarted || 
                     if  (kvp.Value.Thread.ThreadState  ==  ThreadState.Suspended)
                    {
                         // dict.Remove(kvp.Key); // cancle because of lock

                        WorkerThread a = queue.FirstOrDefault();
                         if  (a  !=   null )
                        {
                            removeKey.Add(kvp.Key);
                             // addDict.Add(a.Key, kvp.Value.Change(a));
                            newTask.Add(kvp.Value.Change(a));

                            queue.RemoveAt( 0 );
                             // dict.Add(a.Key, kvp.Value.Change(a)); // cancle because of lock
                             // 将参数加到线程中,并改变线程的状态
                             // dict[a.Key].Thread.Resume();
                        }
                         else
                             break ;
                         // else
                         // {
                         //     System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
                         //                                                             2000, true);
                         //     return;
                         // }

                    }
                }
                removeKey.ForEach(t => dict.Remove(t));
                newTask.ForEach(t  =>
                {
                    dict.Add(t.Key, t);
                    t.Thread.Resume();
                });
                 while  (queue.Count  >   0   &&  dict.Count  <  MaxThreadNum)
                {
                     // 未到线程池最大池程数时,增加线程
                    WorkerThread b  =  queue.FirstOrDefault();
                     if  (b != null )
                    {
                        queue.RemoveAt( 0 );
                         // Thread thd = new Thread(new ThreadStart(b.Exec));
                         // thd.Priority = ThreadPriority.Lowest;
                         // dict.Add(b.Key, thd);
                         // thd.Start();
                        WorkerThread wt  =   new  WorkerThread();
                        wt.Start(b);
                        dict.Add(wt.Key, wt);
                        wt.Thread.Start();

                         // 将参数加到线程中,并改变线程的状态
                    }


                }
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait,  new  WaitOrTimerCallback(aa), state,  2000 ,  true );
            }
        }


         // private static int _maxThreadNum = 1;
         public   static   int  MaxThreadNum
        {
             // get { return _maxThreadNum; }
             // set { _maxThreadNum = value; }
             get ;  set ;
        }
         public   static   void  SetMaxThreadNum( int  num)
        {
             if  (num  <   1 )
                num  =   1 ;
            MaxThreadNum  =  num;
        }

         ///   <summary>
         ///  任务执行队列
         ///   </summary>
         // static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
        
         static  List < WorkerThread >  queue = new  List < WorkerThread > ();
         ///   <summary>
         ///  目前暂定为只使用一个线程,以免耗近资源
         ///   </summary>
         static  Dictionary < string , WorkerThread >  dict  =   new  Dictionary < string , WorkerThread > ( 1 );

         private   static   object  state;

         public   static   void  Aborted( string  key)
        {
             lock  (obj)
            {
                WorkerThread v;
                 if  (dict.TryGetValue(key,  out  v))
                {
                    v.Thread.Abort();
                    dict.Remove(key);
                }
                 int  index  =  queue.FindIndex(t  =>  t.Key  ==  key);
                 if  (index >- 1 )
                    queue.RemoveAt(index);
                wait.Set();
            }
        }

         public   static   void  QueueUserWorkItem(WaitCallback callback,  object  state,  string  key)
        {
            WorkerThread p  =   new  WorkerThread()
                        {
                            Callback  =  callback,
                            State  =  state,
                            Key  =  key
                        };
             // queue.Enqueue(p);
            queue.Add(p);
            wait.Set();
        }


    }

}
public   class  WorkerThread
{
     public  Thread Thread {  get ;  set ; }
     public   string  Key {  get ;  set ; }
     public  WaitCallback Callback {  get ;  set ; }
     public  Object State {  get ;  set ; }
     public   void  Exec()
    {
         while  ( true )
        {
             this .Callback( this .State);
             this .Thread.Suspend();
        }
    }
     public  WorkerThread Change(WorkerThread wt)
    {
         this .Key  =  wt.Key;
         this .Callback  =  wt.Callback;
         this .State  =  wt.State;
         return   this ;
    }
     public   void  Start(WorkerThread wt)
    {
         this .Change(wt);
         this .Thread  =   new  Thread( new  ThreadStart( this .Exec));
         this .Thread.Priority  =  ThreadPriority.Lowest;
    }

     // public void Start(WaitCallback callback,Object state)
     // {
     //     this.Callback = callback;
     //     this.State = state;
     //     if(this.Thread==null){
     //         this.Thread = new Thread(new ThreadStart(this.Exec));
     //         this.Thread.Priority = ThreadPriority.Lowest;
     //         this.Thread.IsBackground = true;
     //         this.Thread.Start();
     //         return;
     //     }
     //     if(this.Thread.ThreadState==ThreadState.Suspended)
     //     {
     //         this.Thread.Resume();
     //     }
     // }
}

测试代码

Code
using  System;
using  System.Collections.Generic;
using  System.Linq;
using  System.Text;
using  System.Threading;

namespace  ThreadPoolTest
{
     public   class  Class1
    {
         static  ManualResetEvent wait = new  ManualResetEvent( false );
         static   void  Main( string [] args)
        {
             object  state = new   object ();
            ThreadPool.MyThreadPool.QueueUserWorkItem( new  WaitCallback(test), state,  " aa " );
            ThreadPool.MyThreadPool.QueueUserWorkItem( new  WaitCallback(test2), state,  " bb " );
            System.Threading.Thread.Sleep( 10000 );
            wait.Set();
            System.Threading.Thread.Sleep( 10000 );
            Console.WriteLine( " aborted aa " );
            ThreadPool.MyThreadPool.Aborted( " aa " );
            ThreadPool.MyThreadPool.QueueUserWorkItem( new  WaitCallback(test3), state,  " cc " );
            System.Threading.Thread.Sleep( 10000 );
            Console.WriteLine( " aborted bb " );
            ThreadPool.MyThreadPool.Aborted( " bb " );
          
            ThreadPool.MyThreadPool.QueueUserWorkItem( new  WaitCallback(test4), state,  " dd " );
            Console.ReadLine();
        }

         private   static   void  test4( object  state)
        {
            Console.WriteLine( " test4 " );
        }

         private   static   void  test3( object  state)
        {
            Console.WriteLine( " test3 " );
        }

         private   static   void  test2( object  state)
        {
             while ( true )
            {
                Console.WriteLine( " test2 " );
                Thread.Sleep( 2000 );
            }
        }

         private   static   void  test( object  state)
        {
             while  ( true )
            {
                Console.WriteLine( " test " );
                Thread.Sleep( 2000 );
                wait.WaitOne(); // 只执行一次
            }
        }
    }
}

查看更多关于自已实现线程池的详细内容...

  阅读:45次