CPlusPlus多线程编程

最近需要使用C++的多线程编写一个处理程序,因此学习记录一下C++多线程的编程知识。

基本概念

多线程(英语:multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。

应用场景

  • Web服务与服务器:
    • Web服务器: Tomcat等处理大量用户请求,每个请求分配一个线程。
    • 游戏服务器: 同时处理多个玩家的连接和游戏逻辑。
  • 后台与异步处理:
    • 定时任务: 定期发送邮件、数据备份、数据分析。
    • 日志记录: 将写日志操作放到后台线程,不阻塞主程序。
    • 异步操作: 发送消息、处理图片上传。
  • 桌面应用与用户界面:
    • 响应性: 将耗时计算移到后台线程,前台显示进度条,避免UI冻结。
    • Swing/JavaFX: 事件处理和耗时操作。
  • 数据处理与计算:
    • CPU密集型任务: 图像/视频处理、密码破解、大规模数据分析(利用多核)。
    • I/O密集型任务: 并行下载文件、同时读写多个文件/数据库。
  • 网络爬虫:
    • 并行抓取: 同时爬取多个网页或API接口,提高效率。
  • 数据库与中间件:
    • 连接池管理: 并发管理数据库连接。
    • 数据迁移与分析: 分块处理大数据。

编程实现

传统的C++(C++11标准之前)中并没有引入线程这个概念,在C++11出来之前,如果我们想要在C++中实现多线程,需要借助操作系统平台提供的API,比如Linux的<pthread.h>,或者windows下的<windows.h> 。

C++11提供了语言层面上的多线程,包含在头文件中。它解决了跨平台的问题,提供了管理线程、保护共享数据、线程间同步操作、原子操作等类。C++11 新标准中引入了5个头文件来支持多线程编程。

这5个头文件分别是:

  • thread
  • mutex
  • atomic
  • condition_variable
  • future

创建线程

创建线程的基本方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 示例1

std::thread myThread ( thread_fun);
//函数形式为void thread_fun()
myThread.join();
//同一个函数可以代码复用,创建多个线程

# 示例2

std::thread myThread ( thread_fun(100));
myThread.join();
//函数形式为void thread_fun(int x)
//同一个函数可以代码复用,创建多个线程

# 示例3

std::thread (thread_fun,1).detach();
//直接创建线程,没有名字
//函数形式为void thread_fun(int x)

主线程与子线程的处理方法:

  • detach方式,启动的线程自主在后台运行,当前的代码继续往下执行,不等待新线程结束。
  • join方式,等待启动的线程完成,才会继续往下执行。

可以使用joinable判断是join模式还是detach模式。

1
if (myThread.joinable()) foo.join();

多线程编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <iostream>                // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.

void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) // 如果标志位不为 true, 则等待...
cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后,
// 线程被唤醒, 继续往下执行打印线程编号id.
std::cout << "thread " << id << '\n';
}

void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 设置全局标志位为 true.
cv.notify_all(); // 唤醒所有线程.
}

int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);

std::cout << "10 threads ready to race...\n";
go(); // go!

for (auto & th : threads)
th.join();

return 0;
}

创建线程池

因为程序边运行边创建线程是比较耗时的,所以我们通过池化的思想:在程序开始运行前创建多个线程,这样,程序在运行时,只需要从线程池中拿来用就可以了.大大提高了程序运行效率.一般线程池都会有以下几个部分构成:

  • 线程池管理器(ThreadPoolManager):用于创建并管理线程池,也就是线程池类
  • 工作线程(WorkThread): 线程池中线程
  • 任务队列task: 用于存放没有处理的任务。提供一种缓冲机制。
  • append:用于添加任务的接口

线程池示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>


class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}

template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<return_type> result = task->get_future(); // 返回一个futur对象,result通过result.get()获取线程函数的返回值 如果线程函数没有执行完就会阻塞在result.get()
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); }); // Lambda函数是一种可调用对象 Lambda函数的语法为[捕获列表](参数列表) { 函数体 }。
}
condition.notify_one();
return result;
}

~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}

private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};

// 示例任务函数
int printHello(int num) {
std::cout << "Hello from thread " << std::this_thread::get_id() << "! Num: " << num << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return num;
}

int main() {
ThreadPool pool(4);

std::vector<std::future<int>> results;

// 提交任务到线程池
std::cout << "Enqueuing tasks..." << std::endl;
for (int i = 0; i < 8; ++i) {
results.push_back(pool.enqueue(printHello, i));
}

for (int i = 0; i < 8; ++i) {
int num = results[i].get(); // 获取线程函数的返回值
std::cout<< "获取到线程函数的返回值:" << num << std::endl; // 获取线程函数的返回值
}

// 等待任务完成
std::this_thread::sleep_for(std::chrono::seconds(5));

return 0;
}

参考链接

  1. c++中的多线程:概念、基本用法、锁以及条件变量和优先级调度策略,by 青山牧云人.
  2. 【C++】多线程(thread)使用详解,by OpenC++.
  3. C++多线程详解(全网最全),by cpp后端技术.
  4. 多线程,by wikipedia.
  5. 详解 C++ 多线程的condition_variable,by CPP加油站.
  6. C++11之std::future对象使用说明,by Jimmy1224.
  7. C++11实现的简单线程池、模板的使用实例:1.向队列中放待执行函数,2.取出队列中待执行函数,by 好人~.
  8. 手把手带你实现std::function,弄懂底层原理,by QZQ54188.
  9. 一文读懂C++11的Lambda表达式的原理与使用场景,by linux.
  10. C++ 函数声明(后置返回类型),by CG6316.
  11. 【C++11 多线程】future与promise(八),by fengMisaka.
  12. C++ 中 typename 关键字的完整指南,by 香草美人.
  13. std::future和std::promise详解(原理、应用、源码),by 孙梓轩.
  14. 1. std::result_of是什么?为什么它出现?,by 讳疾忌医_note.
  15. C++之std::queue::emplace,by jzjhome.
  16. c++11多线程之packaged_task<>介绍与实例,by 荆楚闲人.
  17. 【C++】std::make_shared 详解,by 快起床啊你.
  18. C++11中的std::bind 简单易懂,by 云飞扬_Dylan.
  19. C++编程系列笔记(3)——std::forward与完美转发详解,by 小龙爱学习​.