C++11线程池

本文关注C++线程池的设计,以一个典型的设计方案为例,分析C++11线程池的设计方式。

设计方案

线程池希望达到的效果:

  • 提交任务的方式灵活,可以提交带任意参数的可调用对象。同时提交线程可以获取任务执行完成的状态。
  • 线程池伸缩性,自动回收空闲线程

设计思想

基本成员:工作线程队列(workers),任务队列(tasks),以及条件变量。

基本行为:每创建一个工作线程,都等待在任务队列上(获取条件变量)。任务被提交(commit),加入到任务队列后,通知工作线程,工作线程提取任务执行。

提交方式(CommitTask):为提交带任意参数的可调用对象,使用可变长参数的形式。该函数期望返回异步结果,使用std::future的返回类型,同时在提交之前封装成std::packaged_task的形式从而课获取到std::future类型的结果。

如何回收线程:设计一个监控线程,该线程主要任务是循环监控线程池的空闲线数,在发现空闲线程数超过设置的最大空闲线程数,就回收线程。回收方法很巧妙:向任务队列添加相应数量的“停止任务”,工作线程执行该任务后,会退出线程。

Commit行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template <typename F, typename... Args>
auto ThreadPool::CommitTask(F&& f, Args&&... args) -> std::future<typename std::result_of<F (Args...)>::type>
{
using retType = typename std::result_of<F (Args...)>::type;

auto task = std::make_shared<std::packaged_task<retType ()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
{
std::unique_lock<std::mutex> guard(mutex_);
if(shutdown_)
{
return std::future<retType>();
}

task_.emplace_back( [=]() { (*task(); )}); // 因为任务队列以std::function<void ()>形式组织
{
_CreateWorker();
}

cond_.notify_one();
}

return task->get_future(); //获取std::future的异步结果
}

设计要点以及使用C++11特性

  • 使用模板参数F,以支持任意可调用对象类型的传入,比如lambda,std::function,函数指针,仿函数
  • 使用std::future返回任务的执行结果。
  • 使用std::packaged_task封装任务,以获取异步结果。
  • 使用std::forward,完美转发。
  • 使用emplace_back利用了右值引用的特性和完美转发。emplace_back将参数完美转发给vector的构造函数,也就是说,传入的是lambda,则将使用移动构造函数。

工作线程的执行函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void   ThreadPool::_WorkerRoutine()
{
working_ = true;

while (working_)
{
std::function<void ()> task;

{
std::unique_lock<std::mutex> guard(mutex_);

++ waiters_;
cond_.wait(guard, [this]()->bool { return this->shutdown_ || !tasks_.empty(); } );//谓语,
//只有谓语的执行结果为false,再能继续进行
-- waiters_;

if (this->shutdown_ && tasks_.empty()) // shutdown主动执行的话,直接退出
return;

task = std::move(tasks_.front()); //获取一个任务
tasks_.pop_front();
}

task(); //执行任务
}

// if reach here, this thread is recycled by monitor thread
// 线程执行到这里,表示该线程执行了一个停止任务,相当于消耗了一个停止任务。
-- pendingStopSignal_;
}

监控线程的执行函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void   ThreadPool::_MonitorRoutine()
{
while (!shutdown_)
{
std::this_thread::sleep_for(std::chrono::seconds(1));

std::unique_lock<std::mutex> guard(mutex_);
if (shutdown_)
return;

auto nw = waiters_;

nw -= pendingStopSignal_; //pendingStopSignal_表示当前任务队列中未被消费的“停止任务”。

while (nw -- > maxIdleThread_) //回收多余的线程
{
tasks_.push_back([this]() { working_ = false; });
cond_.notify_one();
++ pendingStopSignal_;
}
}