Threadpool

一、整体架构概述

线程池是一种并发编程的设计模式,核心目的是管理一组预先创建的线程,避免在程序运行过程中频繁创建和销毁线程带来的性能开销。通过线程池,可以将任务存储在任务队列中,让线程池内的线程从队列中取出任务并执行,实现线程的复用,提高程序的执行效率和资源利用率。 笔记基于Github上一个开源项目完成。

二、架构详细组成部分

(一)线程池类(ThreadPool)

1. 成员变量

成员变量名 类型 作用
workers std::vector<> 存储线程池中的所有工作线程。线程创建后添加到该向量,便于后续管理,如析构时通过 join 操作等待线程结束。
tasks std::queue<std::function<void()> 作为任务队列存储待执行任务。新任务添加到队列尾部,工作线程从头部取任务执行,遵循先进先出(FIFO)原则。
queue_mutex std::mutex 互斥锁,保障任务队列的线程安全。因任务队列可能被多线程同时访问(如添加和取出任务),使用它确保同一时间仅一个线程可操作队列,避免数据竞争与不一致问题。
condition std::condition_variable 条件变量,用于线程间同步。任务队列无任务时,工作线程进入等待状态;有新任务添加或线程池停止工作时,通过它通知等待线程。
stop bool 标志位,指示线程池是否停止工作。初始值为 false 表示正常运行;调用析构函数时设为 true,通知所有线程停止。

(二)构造函数(ThreadPool(size_t threads))

  1. 功能:创建指定数量的工作线程,并启动它们开始等待任务。
  2. 详细流程
    cpp
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    inline ThreadPool::ThreadPool(size_t threads)
    : stop(false)
    {
    for(size_t i = 0;i<threads;++i)
    workers.emplace_back(
    [this]
    {
    for(;;)
    {
    std::function<void()> task;

    {
    std::unique_lock<std::mutex> lock(this->queue_mutex);
    this->condition.wait(lock,
    [this]{ return this->stop ||!this->tasks.empty(); });
    if(this->stop && this->tasks.empty())
    return;
    task = std::move(this->tasks.front());
    this->tasks.pop();
    }

    task();
    }
    }
    );
    }
  • 初始化stop标志位:将stop标志位初始化为false,表示线程池开始正常工作。
  • 创建工作线程:使用for循环创建threads个线程,并将它们添加到workers向量中。每个线程执行一个lambda表达式,该表达式包含一个无限循环,用于不断从任务队列中取出任务并执行。
  • 线程工作逻辑
    • 加锁保护任务队列:使用std::unique_lock对queue_mutex加锁,确保在访问任务队列时的线程安全。
    • 等待任务或停止信号:调用condition.wait方法,使线程进入等待状态。该方法接受一个锁对象和一个谓词(lambda表达式),当谓词返回true时,线程被唤醒。谓词[this]{ return this->stop ||!this->tasks.empty(); }表示当线程池停止工作或者任务队列中有任务时,线程被唤醒。
    • 检查停止条件:如果线程池停止工作且任务队列为空,线程返回,退出循环。
    • 取出任务并执行:从任务队列头部取出一个任务,并将其移动到局部变量task中,然后将该任务从队列中移除。最后,执行取出的任务。

(三)任务入队函数(enqueue(F&& f, Args&&... args))

  1. 功能:将一个任务添加到线程池的任务队列中,并返回一个std::future对象,用于后续获取任务的执行结果。
  2. 详细流程
    cpp
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    template<class F, class... Args>
    auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>
    {
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<return_type> res = task->get_future();
    {
    std::unique_lock<std::mutex> lock(queue_mutex);

    if(stop)
    throw std::runtime_error("enqueue on stopped ThreadPool");

    tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
    }
  • 推断任务返回类型:使用std::result_of<F(Args...)>::type推断任务的返回类型,并将其定义为return_type。
  • 封装任务:创建一个std::packaged_task对象,使用std::bind将可调用对象f和其参数args绑定到该任务中。使用std::make_shared创建一个智能指针task指向该任务。
  • 获取std::future对象:调用task->get_future()方法,获取与任务关联的std::future对象res,用于后续获取任务的执行结果。
  • 加锁保护任务队列:使用std::unique_lock对queue_mutex加锁,确保在访问任务队列时的线程安全。
  • 检查线程池状态:检查stop标志位,如果线程池已经停止工作,则抛出std::runtime_error异常,提示不允许在停止的线程池上添加任务。
  • 添加任务到队列:将一个lambda表达式task{ (*task)(); }添加到任务队列中,该lambda表达式会调用std::packaged_task对像task,从而执行封装的任务。
  • 通知等待线程:调用condition.notify_one()方法,通知一个等待的线程有新任务可用。
  • 返回std::future对象:返回之前获取的std::future对象res。

(四)析构函数(~ThreadPool())

  1. 功能:销毁线程池,确保所有线程在退出前完成任务。
  2. 详细流程
    cpp
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    inline ThreadPool::~ThreadPool()
    {
    {
    std::unique_lock<std::mutex> lock(queue_mutex);
    stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
    worker.join();
    }
  • 加锁并设置停止标志:使用std::unique_lock对queue_mutex加锁,将stop标志位设置为true,表示线程池停止工作。
  • 通知所有线程:调用condition.notify_all()方法,通知所有等待的线程线程池已停止工作。
  • 等待线程结束:遍历workers向量,对每个线程调用join()方法,等待线程完成当前任务后退出。

三、输入输出详细说明

(一)输入

  1. 构造函数输入
    • 参数:size_t threads
    • 含义:表示线程池中的线程数量。用户可以根据实际需求和系统资源情况指定合适的线程数量,例如在多核处理器上可以设置为处理器核心数,以充分利用多核优势。
  2. 任务入队函数输入
    • 参数:一个可调用对象F和它的参数Args...
    • 含义:可调用对象可以是函数、函数对象、lambda表达式等,它表示要执行的具体任务。参数Args...是传递给可调用对象的参数。例如,在示例代码中,使用lambda表达式[i] {... }作为可调用对象,i是传递给lambda表达式的参数。

(二)输出

  1. 任务入队函数返回值
    • 类型:std::future<typename std::result_of<F(Args...)>::type>
    • 含义:返回一个std::future对象,用于获取任务的执行结果。std::future提供了一种机制,允许主线程在不阻塞的情况下继续执行其他操作,直到需要获取任务结果时,调用get()方法阻塞等待任务完成并返回结果。
  2. 任务执行结果
    • 获取方式:通过std::future对象的get()方法
    • 含义:调用get()方法会阻塞当前线程,直到任务完成并返回结果。在示例代码中,通过result.get()获取每个任务的返回值,(i * i)的结果。