自已实现线程池
.net内置的threadpool对于加入执行队列的任务,或是正在执行的任务无法取消,这对于我的项目来说有问题,因此要自定义一个线程池。
我的项目中具体的应用情节如下,某一个操作会非常耗时(将网址插入bdb中),如果将其加入线程池中,很可能将线程池中的资源耗尽,因此我希望我可以定义一个maxThreadNum用来控制执行此在操作最大允许同时执行的线程数,同时设定线程等级为最低ThreadPriority.Lowest,我只要这个操作慢慢执行就行了。
代码还需要重构,高手请指点下,对锁的机制还是不太清楚,
Code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Rhino.Commons;
namespace ThreadPool
{
public static class MyThreadPool
{
static object obj = new object ();
static AutoResetEvent wait = new AutoResetEvent( false );
static MyThreadPool()
{
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000 , true );
SetMaxThreadNum( 1 );
}
private static void aa( object state, bool timedOut)
{
lock (obj)
{
// 判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
List < string > removeKey = new List < string > ();
List < WorkerThread > newTask = new List < WorkerThread > ();
// Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
foreach (var kvp in dict)
{ // kvp.Value.ThreadState == ThreadState.Unstarted ||
if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)
{
// dict.Remove(kvp.Key); // cancle because of lock
WorkerThread a = queue.FirstOrDefault();
if (a != null )
{
removeKey.Add(kvp.Key);
// addDict.Add(a.Key, kvp.Value.Change(a));
newTask.Add(kvp.Value.Change(a));
queue.RemoveAt( 0 );
// dict.Add(a.Key, kvp.Value.Change(a)); // cancle because of lock
// 将参数加到线程中,并改变线程的状态
// dict[a.Key].Thread.Resume();
}
else
break ;
// else
// {
// System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
// 2000, true);
// return;
// }
}
}
removeKey.ForEach(t => dict.Remove(t));
newTask.ForEach(t =>
{
dict.Add(t.Key, t);
t.Thread.Resume();
});
while (queue.Count > 0 && dict.Count < MaxThreadNum)
{
// 未到线程池最大池程数时,增加线程
WorkerThread b = queue.FirstOrDefault();
if (b != null )
{
queue.RemoveAt( 0 );
// Thread thd = new Thread(new ThreadStart(b.Exec));
// thd.Priority = ThreadPriority.Lowest;
// dict.Add(b.Key, thd);
// thd.Start();
WorkerThread wt = new WorkerThread();
wt.Start(b);
dict.Add(wt.Key, wt);
wt.Thread.Start();
// 将参数加到线程中,并改变线程的状态
}
}
System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000 , true );
}
}
// private static int _maxThreadNum = 1;
public static int MaxThreadNum
{
// get { return _maxThreadNum; }
// set { _maxThreadNum = value; }
get ; set ;
}
public static void SetMaxThreadNum( int num)
{
if (num < 1 )
num = 1 ;
MaxThreadNum = num;
}
/// <summary>
/// 任务执行队列
/// </summary>
// static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
static List < WorkerThread > queue = new List < WorkerThread > ();
/// <summary>
/// 目前暂定为只使用一个线程,以免耗近资源
/// </summary>
static Dictionary < string , WorkerThread > dict = new Dictionary < string , WorkerThread > ( 1 );
private static object state;
public static void Aborted( string key)
{
lock (obj)
{
WorkerThread v;
if (dict.TryGetValue(key, out v))
{
v.Thread.Abort();
dict.Remove(key);
}
int index = queue.FindIndex(t => t.Key == key);
if (index >- 1 )
queue.RemoveAt(index);
wait.Set();
}
}
public static void QueueUserWorkItem(WaitCallback callback, object state, string key)
{
WorkerThread p = new WorkerThread()
{
Callback = callback,
State = state,
Key = key
};
// queue.Enqueue(p);
queue.Add(p);
wait.Set();
}
}
}
public class WorkerThread
{
public Thread Thread { get ; set ; }
public string Key { get ; set ; }
public WaitCallback Callback { get ; set ; }
public Object State { get ; set ; }
public void Exec()
{
while ( true )
{
this .Callback( this .State);
this .Thread.Suspend();
}
}
public WorkerThread Change(WorkerThread wt)
{
this .Key = wt.Key;
this .Callback = wt.Callback;
this .State = wt.State;
return this ;
}
public void Start(WorkerThread wt)
{
this .Change(wt);
this .Thread = new Thread( new ThreadStart( this .Exec));
this .Thread.Priority = ThreadPriority.Lowest;
}
// public void Start(WaitCallback callback,Object state)
// {
// this.Callback = callback;
// this.State = state;
// if(this.Thread==null){
// this.Thread = new Thread(new ThreadStart(this.Exec));
// this.Thread.Priority = ThreadPriority.Lowest;
// this.Thread.IsBackground = true;
// this.Thread.Start();
// return;
// }
// if(this.Thread.ThreadState==ThreadState.Suspended)
// {
// this.Thread.Resume();
// }
// }
}
测试代码
Code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ThreadPoolTest
{
public class Class1
{
static ManualResetEvent wait = new ManualResetEvent( false );
static void Main( string [] args)
{
object state = new object ();
ThreadPool.MyThreadPool.QueueUserWorkItem( new WaitCallback(test), state, " aa " );
ThreadPool.MyThreadPool.QueueUserWorkItem( new WaitCallback(test2), state, " bb " );
System.Threading.Thread.Sleep( 10000 );
wait.Set();
System.Threading.Thread.Sleep( 10000 );
Console.WriteLine( " aborted aa " );
ThreadPool.MyThreadPool.Aborted( " aa " );
ThreadPool.MyThreadPool.QueueUserWorkItem( new WaitCallback(test3), state, " cc " );
System.Threading.Thread.Sleep( 10000 );
Console.WriteLine( " aborted bb " );
ThreadPool.MyThreadPool.Aborted( " bb " );
ThreadPool.MyThreadPool.QueueUserWorkItem( new WaitCallback(test4), state, " dd " );
Console.ReadLine();
}
private static void test4( object state)
{
Console.WriteLine( " test4 " );
}
private static void test3( object state)
{
Console.WriteLine( " test3 " );
}
private static void test2( object state)
{
while ( true )
{
Console.WriteLine( " test2 " );
Thread.Sleep( 2000 );
}
}
private static void test( object state)
{
while ( true )
{
Console.WriteLine( " test " );
Thread.Sleep( 2000 );
wait.WaitOne(); // 只执行一次
}
}
}
}