Bootstrap

队列高级应用之设计一个高性能线程池

原因排查

经过一个多小时的代码排查终于查明了线上程序线程数过多的原因:这是一个接收mq消息的一个服务,程序大体思路是这样的,监听的线程每次收到一条消息,就启动一个线程去执行,每次启动的线程都是新的。说到这里,咱们就谈一谈这个程序有哪些弊端呢:

解决问题

线程多的问题该怎么解决呢,增加cpu核心数?治标不治本。对于开发者而言,最为常用也最为有效的是线程池化,也就是说线程池。

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。

线程池其中一项很重要的技术点就是任务的队列,队列虽然属于一种基础的数据结构,但是发挥了举足轻重的作用。

队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

队列是一种采用的FIFO(first in first out)方式的线性表,也就是经常说的先进先出策略。 

实现

public class QueueArray
    {
        //队列元素的数组容器
        T[] container = null;
        int IndexHeader, IndexTail;
        public QueueArray(int size)
        {
            container = new T[size];
            IndexHeader = 0;
            IndexTail = 0;
        }
        public void Enqueue(T item)
        {
            //入队的元素放在头指针的指向位置,然后头指针前移
            container[IndexHeader] = item;
            IndexHeader++;
        }
        public T Dequeue()
        {
            //出队:把尾元素指针指向的元素取出并清空(不清空也可以)对应的位置,尾指针前移
            T item = container[IndexTail];
            container[IndexTail] = default(T);
            IndexTail++;
            return item;
        }

    }

public class QueueLinkList
    {
        LinkedList contianer = null;
        public QueueLinkList()
        {
            contianer = new LinkedList();
        }
        public void Enqueue(T item)
        {
            //入队的元素其实就是加入到队尾
            contianer.AddLast(item);
        }
        public T Dequeue()
        {
            //出队:取链表第一个元素,然后把这个元素删除
            T item = contianer.First.Value;
            contianer.RemoveFirst();
            return item;
        }

    }

队列扩展阅读

简单实用的线程池

    //线程池
    public class ThreadPool
    {
        bool PoolEnable = false; //线程池是否可用 
        List ThreadContainer = null; //线程的容器
        ConcurrentQueue JobContainer = null; //任务的容器
        public ThreadPool(int threadNumber)
        {
            PoolEnable = true;
            ThreadContainer = new List(threadNumber);
            JobContainer = new ConcurrentQueue();
            for (int i = 0; i < threadNumber; i++)
            {
                var t = new Thread(RunJob);
                ThreadContainer.Add(t);
                t.Start();
            }           
        }
        //向线程池添加一个任务
        public void AddTask(Action job,object obj, Action errorCallBack=null)
        {
            if (JobContainer != null)
            {
                JobContainer.Enqueue(new ActionData { Job = job, Data = obj , ErrorCallBack= errorCallBack });
            }
          
        }
        //终止线程池
        public void FinalPool()
        {
            PoolEnable = false;
            JobContainer = null;
            if (ThreadContainer != null)
            {
                foreach (var t in ThreadContainer)
                {
                    //强制线程退出并不好,会有异常
                    //t.Abort();
                    t.Join();                    
                }
                ThreadContainer = null;
            }

        }
        private  void RunJob()
        {
            while (true&& JobContainer!=null&& PoolEnable)
            {
                //任务列表取任务
                ActionData job=null;
                JobContainer?.TryDequeue(out job);
                if (job == null)
                {
                    //如果没有任务则休眠
                    Thread.Sleep(10);
                    continue;
                }
                try
                {
                    //执行任务
                    job.Job.Invoke(job.Data);
                }
                catch(Exception error)
                {
                    //异常回调
                    job?.ErrorCallBack(error);
                }
            }
        }
    }

    public class ActionData
    {
        //执行任务的参数
        public object Data { get; set; }
        //执行的任务
        public Action Job { get; set; }
        //发生异常时候的回调方法
        public Action ErrorCallBack { get; set; }
    }

使用

 ThreadPool pool = new ThreadPool(100);
            for (int i = 0; i < 5000; i++)
            {
                pool.AddTask((obj) =>
                {
                    Console.WriteLine($"{obj}__{System.Threading.Thread.CurrentThread.ManagedThreadId}");
                }, i, (e) =>
                {
                    Console.WriteLine(e.Message);
                });
            }
            pool.FinalPool();
            Console.Read();