[C++]C++并发编程实战Chapter3

3 在线程间共享数据

3.1 线程间共享数据的问题

归根结底,多线程共享数据的问题多由数据改动引发。如果所有共享数据都是只读数据,就不会有问题。

不变量(invariant)

它是一个针对某一特定数据的断言,该断言总是成立的,例如“这个变量的值即为链表元素的数目”。数据更新往往会破坏这些不变量。

改动线程间的共享数据,可能导致不变量被破坏。例如对于双向链表,不变量就是如果A节点的next指针指向B节点,那么B节点的prev指针应该指向A节点。如果我们删除双向链表中的某个节点N,我们需要修改N节点前驱节点A的next指针和N节点后继节点的prev指针。当我们只修改了一个指针的时候,不变量即被破坏。

3.1.1 条件竞争

在并发编程中,操作由两个或多个线程负责,它们争先让线程执行各自的操作,而结果取决于它们执行的相对次序,所有这种情况都是条件竞争。

  • 良性条件竞争:例如,两个线程向队列添加数据项以待处理,只要维持住系统的不变量,先添加哪个数据项通常并不重要。
  • 恶行条件竞争:当条件竞争导致不变量被破坏时,才会产生问题,正如前面举例的双向链表。

3.1.2 防止恶性条件竞争

几种防止恶性条件竞争的方法:

  1. 采取保护措施包装数据结构,确保不变量被破坏时,中间状态只对执行改动的线程可见。在其他访问同一数据结构的线程的视角中,这种改动要么尚未开始,要么已经完成。
  2. 修改数据结构的设计及其不变量,由一连串不可拆分的改动完成数据变更,每个改动都维持不变量不被破坏。这通常被称为无锁编程,难以正确编写。
  3. 将修改数据结构当作事务(transaction)来处理,类似于数据库在一个事务内完成更新:把需要执行的数据读写操作视为一个完整序列,先用事务日志存储记录,再把序列当成单一步骤提交运行。若别的线程改动了数据而令提交无法完整执行,则事务重新开始。这称为软件事务内存(Software Transactional Memory,STM)。

3.2 用互斥保护共享数据

标记访问该数据结构的所有代码,令各线程在其上相互排斥(mutually exclusive),只要有线程正在运行标记的代码,任何别的线程意图访问同一份数据,则必须等待,直到该线程完事。

C++线程库保证了,一旦有线程锁住了某个互斥,若其他线程试图再给它加锁,则须等待,直至最初成功加锁的线程把该互斥解锁。

3.2.1

在C++中,通过构造std::mutex的实例来创建互斥,调用成员函数lock()对其加锁,调用unlock()解锁。

但更好的做法是运用RAII的手法,C++标准库提供了类模板std::lock_guard<>,针对互斥类融合实现了RAII手法:在构造时给互斥加锁,在析构时解锁,从而保证互斥总被正确解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(),some_list.end(),value_to_find)
!= some_list.end();
}

C++17引入了一个新特性,名为类模板参数推导(class template argument deduction),对于std::lock_guard<>这种简单的类模板,模板参数列表可以忽略。

注意

如果成员函数返回指针或引用,指向受保护的共享数据,那么即便成员函数全都按良好、有序的方式锁定互斥,仍然无济于事,因为保护已被打破,出现了大漏洞。只要存在任何能访问该指针和引用的代码,它就可以访问受保护的共享数据(也可以修改),而无须锁定互斥。所以,若利用互斥保护共享数据,则需谨慎设计程序接口,从而确保互斥已先行锁定,再对受保护的共享数据进行访问,并保证不留后门。

3.2.2 组织和编排代码以保护共享数据

有几种情况会导致保护失效:

  • 成员函数向调用者返回指向共享数据的指针或引用
  • 若成员函数在自身内部调用了别的函数,我们也需要考虑向这些函数传递指针或引用是否安全。如果该函数将共享数据的地址存起来,等到脱离保护之后再去访问,可能会导致不良后果。

本质上,上述问题是由于未能把所有访问共享数据的代码都标记为互斥。

不得向锁所在的作用域之外传递指针和引用,指向受保护的共享数据,无论是通过函数返回值将它们保存到对外可见的内存,还是将它们作为参数传递给使用者提供的函数。

3.2.3 发现接口固有的条件竞争

考虑stack容器,在多线程环境下,其empty和size的结果不可信。

1
2
3
4
5
6
7
stack<int> s;
if(!s.empty())
{
int const value=s.top();
s.pop();
do_something(value);
}
  • 在if判断和访问栈顶元素之间,可能有别的线程弹出栈顶元素,导致栈空,从而引发未定义行为。
  • 在top和pop之间,也存在条件竞争:两个线程都访问栈顶元素,然后出栈,会导致有一个元素被忽略掉了。

为了解决第二个条件竞争,比较自然的想法是将top和pop合成为一个操作,但是这可能会导致异常不安全。当栈中存储的对象的拷贝构造函数发生错误的时候,可能无法正确获得其对象,但是该对象已经从栈上被弹出了。

消除条件竞争的方法:

  • 传入引用

借一个外部变量接收栈容器弹出的元素,将指涉它的引用通过参数传入pop()调用。

问题:

这种方法要求栈容器存储的型别是可赋值的(assignable)。

  • 提供不抛出异常的拷贝构造函数,或不抛出异常的移动构造函数
  • 返回指针,指向弹出的元素

用智能指针管理是个不错的选择。避免了内存泄漏,并且由c++标准库负责管理内存。

3.2.4 死锁

C++标准库提供了std::lock()函数。它可以同时锁住多个互斥,而没有发生死锁的风险。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail,rhs.some_detail);
}
};

std::lock 锁定两个互斥,其语义是“全员共同成败”(all-or-nothing,或全部成功锁定,或没获取任何锁并抛出异常)。

std::adopt_lock 指明互斥已经锁定,std::lock_guard 应该接收所有权,而不是试图加锁。

必须先判断交换对象是否是同一个对象,因为对某个 std::mutex 对象重复加锁将导致未定义行为(std::recursive_mutex 可以重复加锁)。

针对以上场景,C++17还进一步提供了新的RAII类模板std::scoped_lock<>。std:: scoped_lock<>和std::lock_guard<>完全等价,只不过前者是可变参数模板(variadic template),接收各种互斥型别作为模板参数列表,还以多个互斥对象作为构造函数的参数列表。下列代码中,传入构造函数的两个互斥都被加锁,机制与std::lock()函数相同,因此,当构造函数完成时,它们就都被锁定,而后,在析构函数内一起被解锁。

1
2
3
4
5
6
7
void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m,rhs.m);
swap(lhs.some_detail,rhs.some_detail);
}

3.2.5 防范死锁的补充原则

没有涉及锁,也可能发生死锁现象。假定有两个线程,各自关联了std::thread实例,若它们同时在对方的std::thread实例上调用join(),就能制造出死锁现象却不涉及锁操作。

防范死锁的准则最终可归纳成一个思想:只要另一线程有可能正在等待当前线程,那么当前线程千万不能反过来等待它。

1 避免嵌套锁

假如已经持有锁,不要试图获取第二个锁。这样不会因为锁导致死锁,但是仍然有可能出现死锁现象(线程互相等待)。

如果一定要获取多个锁,最好采用 std::lock 或者 std::scoped_lock.

2 一旦持锁,就须避免由用户提供的程序接口

因为我们不知道用户提供的接口会做什么,它可能要获取锁导致死锁。

3 依从固定顺序获取锁

4 按层级加锁

可以实现一个 hierarchical_mutex 来按照层级加锁,只能按照层级从高到低的方向加锁。只要实现了 lock(),unlock(),try_lock() 三个成员函数,就可以和 std::lock_guard 配合使用。try_lock() : 若另一线程已在目标互斥上持有锁,则函数立即返回false,完全不等待。

为了存储当前层级编号,hierarchical_mutex 的实现使用了线程专属的局部变量(用 thread_local 修饰)。所有互斥的实例都能读取该变量,但它的值因不同线程而异。这使代码可以独立检测各线程的行为,各互斥都能判断是否允许当前线程对其加锁。

5 将准则推广到锁操作以外

对于线程互相等待的情况,也可以用上述的方法来解决。并且还可以让同一个函数启动全部线程,且汇合工作也由之负责。

3.2.6 运用 std::unique_lock 灵活加锁

类模板 std::unique_lock<> 放宽了不变量的成立条件,它相较 std::lock_guard<> 更灵活一些。std::unique_lock 对象不一定始终占有与之关联的互斥。

其构造函数接收第二个参数:

  • 我们可以传入 std::adopt_lock 实例,借此指明 std::unique_lock 对象管理互斥上的锁
  • 也可以传入 std::defer_lock 实例,从而使互斥在完成构造时处于无锁状态,等以后有需要时才在 std::unique_lock 对象(不是互斥对象)上调用 lock() 而获取锁,或把 std::unique_lock 对象交给 std::lock() 函数加锁。

std::unique_lock 占用更多的空间,也比 std::lock_guard 略慢。

3.2.7 在不同作用域之间转移互斥归属权

std::unique_lock 实例不占有与之关联的互斥,所以随着其实例的转移,互斥的归属权可以在多个 std::unique_lock 实例之间转移。

std::unique_lock 属于可移动却不可复制的型别。

转移有一种用途:准许函数锁定互斥,然后把互斥的归属权转移给函数调用者,好让他在同一个锁的保护下执行其他操作。

std::unique_lock 类十分灵活,允许它的实例在被销毁前解锁。其成员函数unlock() 负责解锁操作,这与互斥一致。

3.2.8 按适合的粒度加锁

3.3 保护共享数据的其他工具

3.3.1 在初始化过程中保护共享数据

假设我们需要某个共享数据,而它创建起来开销不菲。因为创建它可能需要建立数据库连接或分配大量内存,所以等到必要时才真正着手创建。这种方式称为延迟初始化(lazy initialization),常见于单线程代码。

1
2
3
4
5
6
7
8
9
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
if(!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
resource_ptr->do_something();
}

若将上述代码转为多线程形式,假设数据本身线程安全。那么仅需要对指针进行保护。最简单的方式如下所示

1
2
3
4
5
6
7
8
9
10
11
12
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex> lk(resource_mutex); // 此处,全部线程都被迫循序运行
if(!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}

这种方式的问题是,所有线程都必须依次运行,并且每次都要进行加锁操作,增加了不必要的开销。

为了改进上述方法,提出了双重检验锁定模式

1
2
3
4
5
6
7
8
9
10
11
12
void undefined_behaviour_with_double_checked_locking()
{
if(!resource_ptr) ⇽--- ①
{
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr) ⇽--- ②
{
resource_ptr.reset(new some_resource); ⇽--- ③
}
}
resource_ptr->do_something(); ⇽--- ④
}

然而,这种方法可能引发恶性条件竞争。当前线程在锁保护范围外读取指针①,而对方线程却可能先获取锁,顺利进入锁保护范围内执行写操作③,因此读写操作没有同步,产生了条件竞争,既涉及指针本身,还涉及其指向的对象。尽管当前线程能够看见其他线程写入指针,却有可能无视新实例some_resource的创建,结果do_something()的调用就会对不正确的值进行操作④。

C++标准库中提供了std::once_flag类和std:: call_once()函数,以专门处理该情况。所有线程共同调用std::call_once()函数,从而确保在该调用返回时, 指针初始化由其中某线程安全且唯一地完成(通过适合的同步机制)。必要的同步数据则由std::once_flag实例存储,每个std::once_flag实例对应一次不同的初始化。相比显式使用互斥,std::call_once()函数的额外开销往往更低,特别是在初始化已经完成的情况下。

1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
std::call_once(resource_flag,init_resource);
resource_ptr->do_something();
}

在类中延迟初始化成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection=connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_):
connection_details(connection_details_)
{}
void send_data(data_packet const& data) ⇽--- ①
{
std::call_once(connection_init_flag,&X::open_connection,this); ⇽--- ②
connection.send_data(data);
}
data_packet receive_data() ⇽--- ③
{
std::call_once(connection_init_flag,&X::open_connection,this); ⇽--- ②
return connection.receive_data();
}
};

std::once_flag的实例既不可复制也不可移动,这与std::mutex类似。

C++11规定静态局部变量的初始化只会在某一线程上单独发生,在初始化完成之前,其他线程不会越过静态数据的声明而继续运行。某些类的代码只需用到唯一一个全局实例,这种情形可用以下方法代替std::call_once():

1
2
3
4
5
6
class my_class;
my_class& get_my_class_instance()
{
static my_class instance;
return instance;
}

3.3.2 保护甚少更新的数据结构

考虑一个存储着DNS条目的缓存表,它将域名解释成对应的IP地址。给定的DNS条目通常在很长时间内都不会变化——在许多情况下,DNS条目保持多年不变。尽管,随着用户访问不同网站,缓存表会不时加入新条目,但在很大程度上,数据在整个生命期内将保持不变。为了判断数据是否有效,必须定期查验缓存表;只要细节有所改动,就需要进行更新。

若采用std::mutex保护数据结构,则过于严苛,原因是即便没发生改动,它照样会禁止并发访问。我们需在这里采用新类型的互斥。由于新的互斥具有两种不同的使用方式,因此通常被称为读写互斥:允许单独一个“写线程”进行完全排他的访问,也允许多个“读线程”共享数据或并发访问。

  • C++17标准库提供了两种新的互斥:std::shared_mutex和std::shared_timed_mutex。
  • C++14标准库只有std::shared_timed_mutex
  • C++11标准库都没有。

可以用 std::shared_mutex 来进行这种同步操作。

对于写操作:用 std::lock_guard 或者 std::unique_lock 来进行锁定,保证访问的排他性。

对于读操作:用 std::shared_lock 实现共享访问。

共享锁仅有一个限制,即假设它已被某些线程所持有,若别的线程试图获取排他锁,就会发生阻塞,直到那些线程全都释放该共享锁。反之,如果任一线程持有排他锁,那么其他线程全都无法获取共享锁或排他锁,直到持锁线程将排他锁释放为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>
class dns_entry;
class dns_cache
{
std::map<std::string,dns_entry> entries;
mutable std::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain) const
{
std::shared_lock<std::shared_mutex> lk(entry_mutex);
std::map<std::string,dns_entry>::const_iterator const it=
entries.find(domain);
return (it==entries.end())?dns_entry():it->second;
}
void update_or_add_entry(std::string const& domain,
dns_entry const& dns_details)
{
std::lock_guard<std::shared_mutex>lk(entry_mutex);
entries[domain]=dns_details;
}
};

3.3.3 递归加锁

假如线程已经持有某个std::mutex实例,试图再次对其重新加锁就会出错,将导致未定义行为。

但在某些场景中,确有需要让线程在同一互斥上多次重复加锁,而无须解锁。C++标准库为此提供了std::recursive_mutex,其工作方式与std::mutex相似,不同之处是,其允许同一线程对某互斥的同一实例多次加锁。我们必须先释放全部的锁,才可以让另一个线程锁住该互斥。例如,若我们对它调用了3次lock(),就必须调用3次unlock()。

若要设计一个类以支持多线程并发访问,它就需包含互斥来保护数据成员,递归互斥常常用于这种情形。每个公有函数都需先锁住互斥,然后才进行操作,最后解锁互斥。但有时在某些操作过程中,公有函数需要调用另一公有函数。在这种情况下,后者将同样试图锁住互斥,如果采用std::mutex便会导致未定义行为。有一种“快刀斩乱麻”的解决方法:用递归互斥代替普通互斥。这容许第二个公有函数成功地对递归互斥加锁,因此函数可以顺利地执行下去。

上述方法不推荐。当以上类型持有锁的时候,其不变量往往会被破坏。通常可以采取更好的方法:根据这两个公有函数的共同部分,提取出一个新的私有函数,新函数由这两个公有函数调用,而它假定互斥已经被锁住,遂无须重复加锁。


[C++]C++并发编程实战Chapter3
https://erlsrnby04.github.io/2025/01/26/C-C-并发编程实战Chapter3/
作者
ErlsrnBy04
发布于
2025年1月26日
许可协议