队列高级应用之设计一个高性能线程池

原因排查
经过一个多小时的代码排查终于查明了线上程序线程数过多的原因:这是一个接收mq消息的一个服务,程序大体思路是这样的,监听的线程每次收到一条消息,就启动一个线程去执行,每次启动的线程都是新的。说到这里,咱们就谈一谈这个程序有哪些弊端呢:
解决问题
线程多的问题该怎么解决呢,增加cpu核心数?治标不治本。对于开发者而言,最为常用也最为有效的是线程池化,也就是说线程池。
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
线程池其中一项很重要的技术点就是任务的队列,队列虽然属于一种基础的数据结构,但是发挥了举足轻重的作用。
队列
队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
队列是一种采用的FIFO(first in first out)方式的线性表,也就是经常说的先进先出策略。

实现
public class QueueArray
{
//队列元素的数组容器
T[] container = null;
int IndexHeader, IndexTail;
public QueueArray(int size)
{
container = new T[size];
IndexHeader = 0;
IndexTail = 0;
}
public void Enqueue(T item)
{
//入队的元素放在头指针的指向位置,然后头指针前移
container[IndexHeader] = item;
IndexHeader++;
}
public T Dequeue()
{
//出队:把尾元素指针指向的元素取出并清空(不清空也可以)对应的位置,尾指针前移
T item = container[IndexTail];
container[IndexTail] = default(T);
IndexTail++;
return item;
}
}
public class QueueLinkList
{
LinkedList contianer = null;
public QueueLinkList()
{
contianer = new LinkedList();
}
public void Enqueue(T item)
{
//入队的元素其实就是加入到队尾
contianer.AddLast(item);
}
public T Dequeue()
{
//出队:取链表第一个元素,然后把这个元素删除
T item = contianer.First.Value;
contianer.RemoveFirst();
return item;
}
}
队列扩展阅读
简单实用的线程池
//线程池
public class ThreadPool
{
bool PoolEnable = false; //线程池是否可用
List ThreadContainer = null; //线程的容器
ConcurrentQueue JobContainer = null; //任务的容器
public ThreadPool(int threadNumber)
{
PoolEnable = true;
ThreadContainer = new List(threadNumber);
JobContainer = new ConcurrentQueue();
for (int i = 0; i < threadNumber; i++)
{
var t = new Thread(RunJob);
ThreadContainer.Add(t);
t.Start();
}
}
//向线程池添加一个任务
public void AddTask(Action
使用
ThreadPool pool = new ThreadPool(100);
for (int i = 0; i < 5000; i++)
{
pool.AddTask((obj) =>
{
Console.WriteLine($"{obj}__{System.Threading.Thread.CurrentThread.ManagedThreadId}");
}, i, (e) =>
{
Console.WriteLine(e.Message);
});
}
pool.FinalPool();
Console.Read();