[C++]C++并发编程实战Chapter4

4 并发操作的同步

4.1 等待事件或等待其他条件

线程A需要等待线程B完成任务,有几种方式

  • 在共享数据内部维护一个标志(互斥保护),线程B完成后,设置该标志。

​ 这种方法中线程A必须不断轮询检查标志。存在资源浪费。

  • 让线程A调用 std::this_thread::sleep_for() 函数,在各次检验之间短暂休眠。
1
2
3
4
5
6
7
8
9
10
11
12
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lk.lock();
}
}

​ 上述代码虽然效率有所提升,但是休眠时间的长短很难确定。如果时间过短,线程仍会频繁检查;时间过长,会导致延迟,任务已经完成但线程A还在休眠。对于实时应用,游戏等,不能接受。

  • 使用C++标准库的工具(优先采用)

​ 最基本的方式是条件变量。当条件成立时,就通过该条件变量,通知所有等待的线程,将他们唤醒继续处理。

4.1.1 凭借条件变量等待条件成立

C++标准库提供了条件变量的两种实现:std::condition_variable 和std::condition_variable_any。它们都在标准库的头文件内声明。两者都需配合互斥,方能提供妥当的同步操作。std::condition_variable仅限于与std::mutex一起使用;然而,只要某一类型符合成为互斥的最低标准,足以充当互斥,std::condition_variable_any即可与之配合使用,因此它的后缀是“_any”。由于std::condition_variable_any更加通用,它可能产生额外开销,涉及其性能、自身的体积或系统资源等,因此std::condition_variable应予优先采用,除非有必要令程序更灵活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread() // 由线程B运行
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
}
data_cond.notify_one();
}
}
void data_processing_thread() // 由线程A运行
{
while(true)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(
lk,[]{return !data_queue.empty();}); ⑤
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if(is_last_chunk(data))
break;
}
}

注意

  • 线程B解锁互斥后再通知线程A,这样线程A唤醒后无需等待互斥解锁,不会再次被阻塞
  • 线程A在条件变量上调用wait,传入锁对象和一个lambda函数,用于表达需要等待成立的条件。只有条件成立,wait才会返回;否则,wait解锁互斥量,并阻塞线程。
  • 线程B准备好数据后,通知线程A,A被唤醒后,首先获取互斥量,然后检查条件是否成立。
  • 如果线程A重新获得互斥,并且查验条件,而这一行为却不是直接响应线程乙的通知,则称之为伪唤醒(spurious wake)。按照C++标准的规定,这种伪唤醒出现的数量和频率都不确定。因此,需要考虑检验函数是否有副作用。

4.1.2 利用条件变量构建线程安全的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <condition_variable>
#include <thread>
#include <iostream>
#include <memory>
#include <queue>
#include <mutex>

template<typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() { }
ThreadSafeQueue(const ThreadSafeQueue& other) {
std::lock_guard<std::mutex> lk(other.mtx_);
data_ = other.data_;
}

bool empty() const {
std::lock_guard<std::mutex> lk(mtx_);
return data_.empty();
}

void push(T new_value) {
{
std::lock_guard<std::mutex> lk(mtx_);
data_.push(new_value);
}
cond_.notify_one();
}

bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mtx_);
if (data_.empty()) return false;
value = data_.front();
data_.pop();
return true;
}

std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mtx_);
if (data_.empty()) return std::make_shared<T>();
auto res = std::make_shared<T>(data_.front());
data_.pop();
return res;
}

void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mtx_);
cond_.wait(lk, [this]{
return !data_.empty();
});
value = data_.front();
data_.pop();
}

std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mtx_);
cond_.wait(lk, [this]{
return !data_.empty();
});
auto res = std::make_shared<T>(data_.front());
data_.pop();
return res;
}

private:
mutable std::mutex mtx_;
std::condition_variable cond_;
std::queue<T> data_;
};

ThreadSafeQueue<int> q;

void ThreadFuncA() {
for (int i = 0; i < 10; ++i) {
std::cout << i << std::endl;
q.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

void ThreadFuncB() {
for (int i = 0; i < 10; ++i) {
auto value = q.wait_and_pop();
std::cout << "i = " << i << ", value = " << *value << std::endl;
}
}


int main() {
std::thread A(ThreadFuncA);
ThreadFuncB();
A.join();
}

注意

  • 因为互斥量会因为锁操作而变化,为了支持const对象,其必须用mutable修饰。

4.2 使用future等待一次性事件发生

4.2.1 从后台任务返回值

如果并不急需线程运算的值,就可以使用 std::async() (函数模板,声明位于头文件 <future> 中)按异步方式启动任务。我们从 std::async() 函数处获得 std::future 对象(而非 std::thread 对象),运行的函数一旦完成,其返回值就由该对象最后持有。若要用到这个值,只需在 future 对象上调用get(),当前线程就会阻塞,以便 future 准备妥当并返回该值。

下面是一个最简单的例子,用 std::future 取得异步任务的函数返回值。

1
2
3
4
5
6
7
8
9
10
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

std::async() 可以接收附加参数,进而传递给任务函数作为参数,同 std::thread 的构造函数一致。

可以给 std::async() 补充一个参数,以指定采用哪种运行方式。参数的类型是 std::launch,其值可以是 std::launch::deferredstd::launch::async 。前者指定在当前线程上延后调用任务函数,等到在future上调用了 wait()get() ,任务函数才会执行;后者指定必须另外开启专属的线程,在其上运行任务函数。该参数的值还可以是 std::launch::deferred | std::launch::async ,表示由 std::async() 的实现自行选择运行方式。

1
2
3
4
5
6
7
8
auto f6=std::async(std::launch::async,Y(),1.2);    ⇽---  ①运行新线程

auto f7=std::async(std::launch::deferred,baz,std::ref(x)); ⇽--- ②在wait()或get()内部运行任务函数
auto f8=std::async( ⇽---
std::launch::deferred | std::launch::async,
baz,std::ref(x));
auto f9=std::async(baz,std::ref(x)); ⇽--- ③交由实现自行选择运行方式
f7.wait(); ⇽--- ④前面②处的任务函数调用被延后,到这里才运行

4.2.2 关联future实例和任务

std::packaged_task<> 连结了future对象与函数(或可调用对象)。std::packaged_task<> 对象在执行任务时,会调用关联的函数(或可调用对象),把返回值保存为future的内部数据,并令future准备就绪。

std::packaged_task<> 是类模板,其模板参数是函数签名。传入的函数必须与之相符(不必严格匹配,只要能够互相转换即可)。

类模板 std::packaged_task<> 具有成员函数 get_future() ,它返回std::future<> 实例,该future的特化类型取决于函数签名所指定的返回值。

std::packaged_task<> 还具备函数调用操作符,它的参数取决于函数签名的参数列表。

std::packaged_task 对象是可调用对象,我们可以直接调用,还可以将其包装在 std::function 对象内,当作线程函数传递给 std::thread 对象,也可以传递给需要可调用对象的函数。

在线程间传递任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()>> tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread()
{
while(!gui_shutdown_message_received())
{
get_and_process_gui_message();
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty())
continue;
task=std::move(tasks.front());
tasks.pop_front();
}
task();
}
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f);
std::future<void> res=task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}

4.2.3 创建 std::promise

std::promise<T> 给出了一种异步求值的方法(类型为T),某个std::future<T> 对象与结果关联,能延后读出需要求取的值。配对的std::promise和std::future可实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的promise设定关联的值,使future准备就绪。

若需从给定的std::promise实例获取关联的std::future对象,调用前者的成员函数get_future()即可,这与std::packaged_task一样。promise的值通过成员函数set_value()设置,只要设置好,future即准备就绪,凭借它就能获取该值。如果std::promise在被销毁时仍未曾设置值,保存的数据则由异常代替。

4.2.4 将异常保存到future中

std::async()

若经由 std::async() 调用的函数抛出异常,会被保存到future中,代替本该设定的值,future进入就绪状态。等到其成员函数get被调用,异常被重新抛出。

std::packaged_task

同上。

std::promise

通过成员函数的显示调用实现。

1
2
3
4
5
6
7
8
9
extern std::promise<double> some_promise;
try
{
some_promise.set_value(calculate_value());
}
catch(...)
{
some_promise.set_exception(std::current_exception());
}

还有一种方式,通过直接销毁与future关联的promise对象或packaged_task对象。

4.2.5 多个线程一起等待

若我们在多个线程上访问同一个std::future对象,而不采取额外的同步措施,将引发数据竞争并导致未定义行为。

std::future仅能移动构造和移动赋值,所以归属权可在多个实例之间转移,但在相同时刻,只会有唯一一个future实例指向特定的异步结果;

std::shared_future的实例则能复制出副本,因此我们可以持有该类的多个对象,它们全指向同一异步任务的状态数据。

向每个线程传递std::shared_future对象的副本,它们为各线程独自所有,并被视作局部变量。因此,这些副本就作为各线程的内部数据,由标准库正确地同步,可以安全地访问。

std::shared_future的实例依据std::future的实例构造而得,前者所指向的异步状态由后者决定。因为std::future对象独占异步状态,其归属权不为其他任何对象所共有,所以若要按默认方式构造std::shared_future对象,则须用std::move向其默认构造函数传递归属权,这使std::future变成空状态(empty state)。

std::future具有成员函数share(),直接创建新的std::shared_future对象,并向它转移归属权。

4.3 限时等待


[C++]C++并发编程实战Chapter4
https://erlsrnby04.github.io/2025/01/26/C-C-并发编程实战Chapter4/
作者
ErlsrnBy04
发布于
2025年1月26日
许可协议