Threadpool
Threadpool
织一、整体架构概述
线程池是一种并发编程的设计模式,核心目的是管理一组预先创建的线程,避免在程序运行过程中频繁创建和销毁线程带来的性能开销。通过线程池,可以将任务存储在任务队列中,让线程池内的线程从队列中取出任务并执行,实现线程的复用,提高程序的执行效率和资源利用率。 笔记基于Github上一个开源项目完成。
二、架构详细组成部分
(一)线程池类(ThreadPool)
1. 成员变量
成员变量名 | 类型 | 作用 |
---|---|---|
workers | std::vector< |
存储线程池中的所有工作线程。线程创建后添加到该向量,便于后续管理,如析构时通过 join 操作等待线程结束。 |
tasks | std::queue<std::function<void()> | 作为任务队列存储待执行任务。新任务添加到队列尾部,工作线程从头部取任务执行,遵循先进先出(FIFO)原则。 |
queue_mutex | std::mutex | 互斥锁,保障任务队列的线程安全。因任务队列可能被多线程同时访问(如添加和取出任务),使用它确保同一时间仅一个线程可操作队列,避免数据竞争与不一致问题。 |
condition | std::condition_variable | 条件变量,用于线程间同步。任务队列无任务时,工作线程进入等待状态;有新任务添加或线程池停止工作时,通过它通知等待线程。 |
stop | bool | 标志位,指示线程池是否停止工作。初始值为 false 表示正常运行;调用析构函数时设为 true,通知所有线程停止。 |
(二)构造函数(ThreadPool(size_t threads))
- 功能:创建指定数量的工作线程,并启动它们开始等待任务。
- 详细流程 cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop ||!this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
- 初始化stop标志位:将stop标志位初始化为false,表示线程池开始正常工作。
- 创建工作线程:使用for循环创建threads个线程,并将它们添加到workers向量中。每个线程执行一个lambda表达式,该表达式包含一个无限循环,用于不断从任务队列中取出任务并执行。
- 线程工作逻辑:
- 加锁保护任务队列:使用std::unique_lock对queue_mutex加锁,确保在访问任务队列时的线程安全。
- 等待任务或停止信号:调用condition.wait方法,使线程进入等待状态。该方法接受一个锁对象和一个谓词(lambda表达式),当谓词返回true时,线程被唤醒。谓词[this]{ return this->stop ||!this->tasks.empty(); }表示当线程池停止工作或者任务队列中有任务时,线程被唤醒。
- 检查停止条件:如果线程池停止工作且任务队列为空,线程返回,退出循环。
- 取出任务并执行:从任务队列头部取出一个任务,并将其移动到局部变量task中,然后将该任务从队列中移除。最后,执行取出的任务。
(三)任务入队函数(enqueue(F&& f, Args&&... args))
- 功能:将一个任务添加到线程池的任务队列中,并返回一个std::future对象,用于后续获取任务的执行结果。
- 详细流程 cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
- 推断任务返回类型:使用std::result_of<F(Args...)>::type推断任务的返回类型,并将其定义为return_type。
- 封装任务:创建一个std::packaged_task对象,使用std::bind将可调用对象f和其参数args绑定到该任务中。使用std::make_shared创建一个智能指针task指向该任务。
- 获取std::future对象:调用task->get_future()方法,获取与任务关联的std::future对象res,用于后续获取任务的执行结果。
- 加锁保护任务队列:使用std::unique_lock对queue_mutex加锁,确保在访问任务队列时的线程安全。
- 检查线程池状态:检查stop标志位,如果线程池已经停止工作,则抛出std::runtime_error异常,提示不允许在停止的线程池上添加任务。
- 添加任务到队列:将一个lambda表达式task{ (*task)(); }添加到任务队列中,该lambda表达式会调用std::packaged_task对像task,从而执行封装的任务。
- 通知等待线程:调用condition.notify_one()方法,通知一个等待的线程有新任务可用。
- 返回std::future对象:返回之前获取的std::future对象res。
(四)析构函数(~ThreadPool())
- 功能:销毁线程池,确保所有线程在退出前完成任务。
- 详细流程 cpp
1
2
3
4
5
6
7
8
9
10inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
- 加锁并设置停止标志:使用std::unique_lock对queue_mutex加锁,将stop标志位设置为true,表示线程池停止工作。
- 通知所有线程:调用condition.notify_all()方法,通知所有等待的线程线程池已停止工作。
- 等待线程结束:遍历workers向量,对每个线程调用join()方法,等待线程完成当前任务后退出。
三、输入输出详细说明
(一)输入
- 构造函数输入
- 参数:size_t threads
- 含义:表示线程池中的线程数量。用户可以根据实际需求和系统资源情况指定合适的线程数量,例如在多核处理器上可以设置为处理器核心数,以充分利用多核优势。
- 任务入队函数输入
- 参数:一个可调用对象F和它的参数Args...
- 含义:可调用对象可以是函数、函数对象、lambda表达式等,它表示要执行的具体任务。参数Args...是传递给可调用对象的参数。例如,在示例代码中,使用lambda表达式[i] {... }作为可调用对象,i是传递给lambda表达式的参数。
(二)输出
- 任务入队函数返回值
- 类型:std::future<typename std::result_of<F(Args...)>::type>
- 含义:返回一个std::future对象,用于获取任务的执行结果。std::future提供了一种机制,允许主线程在不阻塞的情况下继续执行其他操作,直到需要获取任务结果时,调用get()方法阻塞等待任务完成并返回结果。
- 任务执行结果
- 获取方式:通过std::future对象的get()方法
- 含义:调用get()方法会阻塞当前线程,直到任务完成并返回结果。在示例代码中,通过result.get()获取每个任务的返回值,(i * i)的结果。