C++ 并发编程实战:手写一个简易线程池 (ThreadPool)
深入解析 C++11 线程池的实现原理,通过代码实战讲解 std::thread, std::mutex, std::condition_variable 的协同工作机制,以及如何优雅地管理任务队列和线程生命周期。
在高性能服务器开发中,频繁地创建和销毁线程会带来巨大的系统开销。线程池 (Thread Pool) 技术应运而生,它通过复用一组固定的工作线程来执行大量短任务,极大地提高了系统的响应速度和资源利用率。
今天,我们将基于 C++11 标准库,从零实现一个简易但功能完备的线程池,并深入剖析其背后的并发同步机制。
🏗️ 核心设计思路
一个标准的线程池主要包含三个核心组件:
- 任务队列 (
std::queue):存储待执行的任务(通常封装为std::function)。 - 工作线程 (
std::vector<std::thread>):一组常驻线程,不断从队列中取任务执行。 - 同步机制 (
std::mutex+std::condition_variable):- 互斥锁:保护任务队列,防止多线程竞争。
- 条件变量:当队列为空时,让工作线程进入休眠;当有新任务时,唤醒线程。
💻 完整代码实现
1. 头文件设计 (ThreadPool.h)
我们使用模板函数 submit 来支持任意可调用对象(Lambda、函数指针、bind 表达式等)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#pragma once
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <future>
#include <memory>
#include <iostream>
#include <utility>
#include <stdexcept>
#include <functional>
#include <condition_variable>
namespace
{
const size_t DEFAULT_THREAD_POOL_SIZE = 4;
using TaskHandler = std::function<void()>;
} // namespace
class ThreadPool
{
private:
std::queue<TaskHandler> queue_;
std::vector<std::thread> workers_;
std::mutex mutex_;
bool is_running_;
std::condition_variable condition_;
public:
inline ThreadPool(const int pool_size = DEFAULT_THREAD_POOL_SIZE);
inline ~ThreadPool();
template<class F, class ...Args>
inline auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>;
};
2. 源文件实现 (ThreadPool.cpp)
这里是线程池的“心脏”,工作线程的生命周期管理在此完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
ThreadPool::ThreadPool(const int pool_size) : is_running_(true)
{
workers_.reserve(pool_size);
// 启用多个线程,用于执行提交的任务
for (int i = 0; i < pool_size; ++i) {
// 创建线程,放入队列中
workers_.emplace_back([this]{
while (true) {
TaskHandler task{};
{
// 阻塞从队列中取出任务
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]{
return !queue_.empty() || !is_running_;
});
if (!queue_.empty()) {
task = std::move(queue_.front());
queue_.pop();
}
else {
// 即将退出
break;
}
}
// 执行任务,可是我们要返回的是一个future, 通过promise来获取
// 无需异常捕获,package_task已经将任务封装,会自动进行异常捕获,并在future.get时,自动抛出
task();
}
});
}
}
ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mutex_);
is_running_ = false;
}
// 所有线程进入销毁
condition_.notify_all();
for (std::thread& woker : workers_) {
if (woker.joinable())
woker.join();
}
}
template<class F, class ...Args>
auto ThreadPool::enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
// 创建一个异步任务
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
// 加锁添加任务
{
std::lock_guard<std::mutex> lock(mutex_);
if (!is_running_) {
throw std::runtime_error("enqueue on stoped ThreadPool");
}
queue_.emplace([task]{
(*task)(); // 执行
});
}
// 唤醒线程来取得任务
condition_.notify_one();
return res;
}
🔍 核心原理解析
1. 为什么使用 std::unique_lock 而不是 lock_guard?
在 submit 函数中,我们只需要短暂的加锁入队,所以使用轻量级的 lock_guard。 但在工作线程的 while 循环中,我们需要在等待条件时自动释放锁。std::condition_variable::wait() 要求传入一个 std::unique_lock,因为它需要在内部原子地执行“释放锁 + 挂起线程”的操作,避免竞态条件。
2. 条件变量的等待谓词 (Predicate)
1
2
3
m_condition.wait(lock, [this]() {
return !this->jobs.empty() || this->stop;
});
这行代码等价于一个循环:
1
2
3
4
5
while (!(!this->jobs.empty() || this->stop)) {
lock.unlock();
wait_for_notification();
lock.lock();
}
它确保了线程只有在有任务或者需要停时才会醒来,有效避免了虚假唤醒 (Spurious Wakeup) 问题。
3. 优雅的退出机制
析构函数是线程池安全关闭的关键:
- 设置
stop = true。 - 调用
notify_all()唤醒所有因队列为空而休眠的线程。 - 工作线程醒来后,发现
stop为真且队列为空,便会执行return退出while循环。 - 主线程调用
join()等待所有工作线程清理完毕,确保资源不泄露。
🚀 测试示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "ThreadPool.h"
#include <iostream>
#include <chrono>
int main() {
ThreadPool pool(4); // 创建 4 个线程
std::cout << "Submitting tasks..." << std::endl;
// 提交 10 个任务
for (int i = 0; i < 10; ++i) {
pool.submit([i]() {
std::cout << "Task " << i << " executed by thread "
<< std::this_thread::get_id() << std::endl;
// 模拟耗时操作
std::this_thread::sleep_for(std::chrono::milliseconds(500));
});
}
std::cout << "Tasks submitted. Waiting for completion..." << std::endl;
// main 函数结束,pool 析构,自动等待所有任务完成
return 0;
}
编译命令:
1
g++ -std=c++11 -pthread main.cpp ThreadPool.cpp -o thread_pool_demo
⚠️ 潜在优化与注意事项
虽然这个实现已经具备了线程池的核心功能,但在生产环境中还可以进一步优化:
- 任务返回值支持:当前
Job定义为void()。若要支持有返回值的任务,可以结合std::packaged_task和std::future,让submit返回std::future<T>。 - 动态扩缩容:当前线程数是固定的。可以根据 CPU 负载动态增加或减少工作线程。
- 异常处理:当前代码中,如果任务内部抛出异常且未捕获,会导致
std::terminate终止整个程序。建议在job()执行外层包裹try-catch块。 - 无锁队列:对于极高并发场景,可以考虑使用无锁队列 (Lock-free Queue) 替代
std::queue+mutex的组合,进一步减少锁竞争。
📝 总结
通过这个简易线程池的实现,我们深入理解了 C++ 并发编程的三大基石:线程管理、互斥锁和条件变量。掌握这些底层原理,不仅能帮助我们写出更高效的并发代码,也能让我们在使用现成库(如 Java Executor, Go Goroutine)时更加得心应手。
希望这篇博客能为你的 C++ 进阶之路提供一点帮助!欢迎在评论区交流讨论。
参考链接: