设计基于锁的并发数据结构 并发设计的内涵 线程安全 
多线程执行的操作无论异同,每个线程所见的数据结构都是自恰的;数据不会丢失或破坏,所有不变量终将成立,恶性条件竞争也不会出现。
真正的并发 
保护的范围越小,需要的串行化操作就越少,并发程度就可能越高。
基于锁的并发数据结构 基于锁实现线程安全的栈容器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include  <mutex>  #include  <stack>  #include  <exception>  #include  <memory>  struct  empty_stack  : std::exception {     const  char * what ()  const  throw ()  ; };template  <typename  T>class  ThreadSafeStack  {public :     ThreadSafeStack () = default ;     ThreadSafeStack (const  ThreadSafeStack& other) {         std::lock_guard<std::mutex> lk (other.mtx_)  ;         data_ = other.data_;     }     ThreadSafeStack& operator =(const  ThreadSafeStack&) = delete ;     void  push (T new_value)   {         std::lock_guard<std::mutex> lk (mtx_)  ;         data_.push (std::move (new_value));     }     void  pop (T& value)   {         std::lock_guard<std::mutex> lk (mtx_)  ;         if  (data_.empty ()) throw  empty_stack ();         value = std::move (data_.top ());         data_.pop ();     }     std::shared_ptr<T> pop ()   {         std::lock_guard<std::mutex> lk (mtx_)  ;         if  (data_.empty ()) throw  empty_stack ();         auto  res = std::make_shared <T>(std::move (data_.top ());         data_.pop ();         return  res;     }     bool  empty () const  {         std::lock_guard<std::mutex> lk (mtx_);         return  data_.empty ();     }private :     mutable  std::mutex mtx_;     std::stack<T> data_; };
 
每个成员函数都在内部互斥m之上加锁,因此这保障了基本的线程安全。这种方式保证了任何时刻都仅有唯一线程访问数据。 
在empty()和每个pop()重载之间,都潜藏着数据竞争的隐患。然而,pop()函数不仅可以加锁,其还可以明文判别内部的栈容器是否为空,所以这不属于恶性数据竞争。 
这段代码有可能引起死锁,原因是我们在持锁期间执行用户代码:栈容器所含的数据中,有用户自定义的复制构造函数、移动构造函数、拷贝赋值操作符和移动赋值操作符,用户也有可能自行重载new操作符。假使栈容器要插入或移除数据,在操作过程中数据自身调用了上述函数,则可能再进一步调用栈容器的成员函数,因而需要获取锁,但相关的互斥却已被锁住,最后导致死锁。向栈容器添加/移除数据,却不涉及复制行为或内存分配,这是不切实际的空想。合理的解决方式是对栈容器的使用者提出要求,由他们负责保证避免以上死锁场景。 
需要使用者保证,必须当对象构造完成之后,别的线程才能访问数据;只有当所有线程都停止访问,才可以销毁对象。 
该实现仅容许一次只有一个线程访问数据。性能有限。 
该栈容器并未提供任何等待/添加数据的操作,因此,假如栈容器满载数据而线程又等着添加数据,它就必须定期反复调用empty(),或通过调用pop()而捕获empty_stack异常,从而查验栈容器是否为空。万一真的出现这种情况,本例的栈容器实现就绝非最佳选择,因为等待的线程只有耗费宝贵的算力查验数据,或者栈容器使用者不得不另行编写代码,以在外部实现“等待-通知”的功能(如利用条件变量),令内部锁操作变得多余且浪费。 
 
采用锁和条件变量实现线程安全的队列容器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 #include  <queue>  #include  <mutex>  #include  <condition_variable>  template <typename  T>class  threadsafe_queue  {private :     mutable  std::mutex mut;     std::queue<T> data_queue;     std::condition_variable data_cond;public :     threadsafe_queue ()     {}     void  push (T new_value)      {         std::lock_guard<std::mutex> lk (mut)  ;         data_queue.push (std::move (new_value));         data_cond.notify_one ();       }     void  wait_and_pop (T& value)        {         std::unique_lock<std::mutex> lk (mut)  ;         data_cond.wait (lk,[this ]{return  !data_queue.empty ();});         value=std::move (data_queue.front ());         data_queue.pop ();     }     std::shared_ptr<T> wait_and_pop ()         {         std::unique_lock<std::mutex> lk (mut)  ;         data_cond.wait (lk,[this ]{return  !data_queue.empty ();});           std::shared_ptr<T> res (              std::make_shared<T>(std::move(data_queue.front())))  ;         data_queue.pop ();         return  res;     }     bool  try_pop (T& value)      {         std::lock_guard<std::mutex> lk (mut)  ;         if (data_queue.empty ())             return  false ;         value=std::move (data_queue.front ());         data_queue.pop ();         return  true ;     }     std::shared_ptr<T> try_pop ()       {         std::lock_guard<std::mutex> lk (mut)  ;         if (data_queue.empty ())             return  std::shared_ptr <T>();           std::shared_ptr<T> res (              std::make_shared<T>(std::move(data_queue.front())))  ;         data_queue.pop ();         return  res;     }     bool  empty ()  const      {         std::lock_guard<std::mutex> lk (mut)  ;         return  data_queue.empty ();     } };
 
假定在数据压入队列的过程中,有多个线程同时在等待,那么data_cond.notify_one()的调用只会唤醒其中一个。然而,若该觉醒的线程在执行wait_and_pop()时抛出异常(譬如新指针std::shared_ptr<>在构建时就有可能产生异常④),就不会有任何其他线程被唤醒。
如果我们不能接受这种行为方式,则将data_cond.notify_one()改为data_cond.notify_all()。这样就会唤醒全体线程,但要大大增加开销:它们绝大多数还是会发现队列依然为空,只好重新休眠。 
第二种处理方式是,倘若有异常抛出,则在wait_and_pop()中再次调用notify_one(),从而再唤醒另一线程,让它去获取存储的值。 
第三种处理方式是,将std::shared_ptr<>的初始化语句移动到push()的调用处,令队列容器改为存储std::shared_ptr<>,而不再直接存储数据的值。从内部std::queue<>复制std::shared_ptr<>实例的操作不会抛出异常,所以wait_and_pop()也是异常安全的。 
 
 
 
采用精细粒度的锁和条件变量实现线程安全的队列容器 用单向链表充当队列的数据结构。队列含有一个“头指针head”,它指向头节点,每个节点再依次指向后继节点。队列弹出数据的方法是更改head指针:将指向目标改为其后继节点,并返回原来的第一项数据。
新数据从队列末端加入,其实现方式是,队列另外维护一个“尾指针tail”,指向尾节点。假如有新节点加入,则将尾节点的next指针指向新节点,并更新tail指针,令其指向新节点。如果队列为空,则将head指针和tail指针都设置为NULL。
单线程版本 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include  <iostream>  #include  <memory>  template  <typename  T>class  Queue  {public :     queue () : tail (nullptr )     {}     queue (const  queue& other) = delete ;     queue& operator =(const  queue& other) = delete ;     std::shared_ptr<T> try_pop ()   {         if  (!head) {             return  std::shared_ptr <T>();         }         std::shared_ptr<T> const  res = std::make_shared (std::move (head->data));         const  auto  old_head = std::move (head);         head = std::move (old_head->next);         if  (!head) {             tail = nullptr ;         }         return  res;     }     void  push (T new_value)   {         std::unique_ptr<node> p (new  node(std::move(new_value)))  ;         node* const  new_tail = p.get ();         if  (tail) {             tail->next = std::move (p);         } else  {             head = std::move (p);         }         tail = new_tail;     }private :     struct  node  {         T data;         std::unique_ptr<node> next;         node (T data_):             data (std::move (data_))         {}     };     std::unique_ptr<node> head;     node* tail; };
 
多线程下存在的问题:
push可以同时改动head指针和tail指针,所以该函数需要将两个互斥锁住。 
push和try_pop有可能并发访问同一个节点的next指针:当队列仅含有一个节点的时候,即head==tail。这会导致锁定同一个互斥。 
 
通过分离数据实现并发 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 #include  <memory>  #include  <mutex>  template <typename  T>class  threadsafe_queue  {private :     struct  node      {         std::shared_ptr<T> data;         std::unique_ptr<node> next;     };     std::mutex head_mutex;     std::unique_ptr<node> head;     std::mutex tail_mutex;     node* tail;     node* get_tail ()       {         std::lock_guard<std::mutex> tail_lock (tail_mutex)  ;         return  tail;     }     std::unique_ptr<node> pop_head ()       {         std::lock_guard<std::mutex> head_lock (head_mutex)  ;          if (head.get ()==get_tail ())         {             return  nullptr ;         }         std::unique_ptr<node> old_head=std::move (head);         head=std::move (old_head->next);         return  old_head;     }public :     threadsafe_queue ():         head (new  node),tail (head.get ())     {}     threadsafe_queue (const  threadsafe_queue& other)=delete ;     threadsafe_queue& operator =(const  threadsafe_queue& other)=delete ;     std::shared_ptr<T> try_pop ()       {         std::unique_ptr<node> old_head=pop_head ();         return  old_head?old_head->data:std::shared_ptr <T>();     }     void  push (T new_value)      {         std::shared_ptr<T> new_data (              std::make_shared<T>(std::move(new_value)))  ;         std::unique_ptr<node> p (new  node)  ;         node* const  new_tail=p.get ();         std::lock_guard<std::mutex> tail_lock (tail_mutex)  ;         tail->data=new_data;         tail->next=std::move (p);         tail=new_tail;     } };
 
对于push而言,对tail指针的全部访问都需要加锁 
对于try_pop而言,需要对head指针加锁,此外,还需要在内部访问tail指针一次,所以需要在对应的互斥上加锁。 
在try_pop中,对tail的访问加锁是必要的保护。如果不加锁,try_pop和push可能会由不同的线程调用,导致对tail的访问出现竞争行为。 
get_tail的调用在head_mutex保护范围中,这很重要。反之,先对tail加锁,获取到tail指针,再对head加锁,但此时可能无法顺利加锁,此时别的线程可能获取到锁进行try_pop,等到当前线程对head加锁的时候,head和tail都可能发生了改变,所以之前返回的tail指针可能不在是尾指针,甚至已经不是队列中的节点了。 
并发性分析
push在没有持锁的状态下,为新节点和新数据完成了内存分配。 
try_pop只在互斥tail_mutex上短暂持锁,因此try_pop和push几乎可以并发执行。 
队列节点通过智能指针的析构函数删除,在head_mutex的保护范围以外执行,如此可以提高try_pop的并发度,因为删除节点、返回数据之类耗时的操作在互斥外执行。 
 
 
 
等待数据弹出 
进一步实现wait_and_pop等功能。
接口设计:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 template <typename  T>class  threadsafe_queue  {private :     struct  node      {         std::shared_ptr<T> data;         std::unique_ptr<node> next;     };     std::mutex head_mutex;     std::unique_ptr<node> head;     std::mutex tail_mutex;     node* tail;     std::condition_variable data_cond;public :     threadsafe_queue ():         head (new  node),tail (head.get ())     {}     threadsafe_queue (const  threadsafe_queue& other)=delete ;     threadsafe_queue& operator =(const  threadsafe_queue& other)=delete ;     std::shared_ptr<T> try_pop ()  ;     bool  try_pop (T& value)  ;     std::shared_ptr<T> wait_and_pop ()  ;     void  wait_and_pop (T& value)  ;     void  push (T new_value)  ;     bool  empty ()  ; };
 
向队列中压入新节点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 template <typename  T>void  threadsafe_queue<T>::push (T new_value) {     std::shared_ptr<T> new_data (          std::make_shared<T>(std::move(new_value)))  ;     std::unique_ptr<node> p (new  node)  ;     {         std::lock_guard<std::mutex> tail_lock (tail_mutex)  ;         tail->data=new_data;         node* const  new_tail=p.get ();         tail->next=std::move (p);         tail=new_tail;     }     data_cond.notify_one (); }
 
wait_and_pop实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 template <typename  T>class  threadsafe_queue  {private :     node* get_tail ()       {         std::lock_guard<std::mutex> tail_lock (tail_mutex)  ;         return  tail;     }     std::unique_ptr<node> pop_head ()          {         std::unique_ptr<node> old_head=std::move (head);         head=std::move (old_head->next);         return  old_head;     }     std::unique_lock<std::mutex> wait_for_data ()         {         std::unique_lock<std::mutex> head_lock (head_mutex)  ;         data_cond.wait (head_lock,[&]{return  head.get ()!=get_tail ();});         return  std::move (head_lock);       }     std::unique_ptr<node> wait_pop_head ()       {         std::unique_lock<std::mutex> head_lock (wait_for_data())  ;           return  pop_head ();     }     std::unique_ptr<node> wait_pop_head (T& value)       {         std::unique_lock<std::mutex> head_lock (wait_for_data())  ;           value=std::move (*head->data);         return  pop_head ();     }public :     std::shared_ptr<T> wait_and_pop ()       {         std::unique_ptr<node> const  old_head=wait_pop_head ();         return  old_head->data;     }     void  wait_and_pop (T& value)      {         std::unique_ptr<node> const  old_head=wait_pop_head (value);     } };
 
try_pop和empty实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 template <typename  T>class  threadsafe_queue  {private :     std::unique_ptr<node> try_pop_head ()       {         std::lock_guard<std::mutex> head_lock (head_mutex)  ;         if (head.get ()==get_tail ())         {             return  std::unique_ptr <node>();         }         return  pop_head ();     }     std::unique_ptr<node> try_pop_head (T& value)       {         std::lock_guard<std::mutex> head_lock (head_mutex)  ;         if (head.get ()==get_tail ())         {             return  std::unique_ptr <node>();         }         value=std::move (*head->data);         return  pop_head ();     }public :     std::shared_ptr<T> try_pop ()       {         std::unique_ptr<node> old_head=try_pop_head ();         return  old_head?old_head->data:std::shared_ptr <T>();     }     bool  try_pop (T& value)      {         std::unique_ptr<node> const  old_head=try_pop_head (value);         return  old_head;     }     bool  empty ()      {         std::lock_guard<std::mutex> head_lock (head_mutex)  ;         return  (head.get ()==get_tail ());     } };
 
完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 #include  <memory>  #include  <iostream>  #include  <mutex>  #include  <condition_variable>  #include  <thread>  template <typename  T>class  threadsafe_queue  {private :     struct  node      {         std::shared_ptr<T> data;         std::unique_ptr<node> next;     };     std::mutex head_mutex;     std::unique_ptr<node> head;     std::mutex tail_mutex;     node* tail;     std::condition_variable data_cond;public :     threadsafe_queue ():         head (new  node),tail (head.get ())     {}     threadsafe_queue (const  threadsafe_queue& other)=delete ;     threadsafe_queue& operator =(const  threadsafe_queue& other)=delete ;     std::shared_ptr<T> try_pop ()   {         auto  old_head = try_pop_head ();          return  old_head ? old_head->data : std::shared_ptr <T>();     }     bool  try_pop (T& value)   {         auto  old_head = try_pop_head (value);         return  old_head.get () != nullptr ;     }     std::shared_ptr<T> wait_and_pop ()   {         auto  old_head = wait_pop_head ();         return  old_head ? old_head->data : std::shared_ptr <T>();     }     void  wait_and_pop (T& value)   {         auto  old_head = wait_pop_head (value);     }     void  push (T new_value)   {         auto  new_data = std::make_shared <T>(std::move (new_value));         auto  p = std::make_unique <node>();         {             std::lock_guard<std::mutex> lk (tail_mutex)  ;             tail->data = new_data;             auto  new_tail = p.get ();             tail->next = std::move (p);             tail = new_tail;         }                  data_cond.notify_one ();     }     bool  empty ()   {         std::lock_guard<std::mutex> lk (head_mutex)  ;         return  head.get () == get_tail ();     }private :     node* get_tail ()   {         std::lock_guard<std::mutex> lk (tail_mutex)  ;         return  tail;     }          std::unique_ptr<node> pop_head ()   {         auto  old_head = std::move (head);         head = std::move (old_head->next);         return  old_head;      }     std::unique_lock<std::mutex> wait_for_data ()   {         std::unique_lock<std::mutex> lk (head_mutex)  ;         data_cond.wait (lk, [this ]{             return  head.get () != get_tail ();         });         return  std::move (lk);     }     std::unique_ptr<node> wait_pop_head ()   {         auto  lk = wait_for_data ();         return  pop_head ();     }     std::unique_ptr<node> wait_pop_head (T& value)   {         auto  lk = wait_for_data ();                  value = std::move (*head->data);         return  pop_head ();      }     std::unique_ptr<node> try_pop_head ()   {         std::lock_guard<std::mutex> lk (head_mutex)  ;         if  (head.get () == get_tail ()) {             return  std::unique_ptr <node>();         }         return  pop_head ();     }     std::unique_ptr<node> try_pop_head (T& value)   {         std::lock_guard<std::mutex> lk (head_mutex)  ;         if  (head.get () == get_tail ()) {             return  std::unique_ptr <node>();         }                  value = std::move (*head->data);         return  pop_head ();      } };
 
附上一个简单的测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 #include  <iostream>  #include  <vector>  #include  <thread>  #include  <atomic>  #include  <mutex>  #include  <cassert>  #include  <chrono>  void  single_thread_test ()   {     threadsafe_queue<int > queue;          assert (queue.empty ());          queue.push (42 );     assert (!queue.empty ());          int  val;     bool  success = queue.try_pop (val);     assert (success);     assert (val == 42 );     assert (queue.empty ());          std::thread consumer ([&]{          int  popped_val;         queue.wait_and_pop(popped_val);         assert(popped_val == 100 );     })  ;     std::this_thread::sleep_for (std::chrono::milliseconds (10 ));     queue.push (100 );     consumer.join ();     assert (queue.empty ());     std::cout << "Single thread tests passed!\n" ; }void  multi_thread_test ()   {     threadsafe_queue<int > queue;     const  int  NUM_ITEMS = 10000 ;     std::atomic<int > counter (0 )  ;     std::vector<std::thread> threads;          for  (int  i = 0 ; i < 2 ; ++i) {         threads.emplace_back ([&] {             for  (int  j = 0 ; j < NUM_ITEMS; ++j) {                 queue.push (j);             }         });     }          for  (int  i = 0 ; i < 2 ; ++i) {         threads.emplace_back ([&] {             for  (int  j = 0 ; j < NUM_ITEMS; ++j) {                 int  val;                 queue.wait_and_pop (val);                 counter += val;             }         });     }          for  (auto & t : threads) {         t.join ();     }          int  expected = 2  * (NUM_ITEMS-1 )*NUM_ITEMS/2 ;     assert (counter.load () == expected);     std::cout << "Multi-thread tests passed!\n" ; }int  main ()   {     single_thread_test ();     multi_thread_test ();     return  0 ; }
 
这个队列是无限队列。只要存在空闲内存,即便已存入的数据没有被移除,各个线程还是能持续往队列添加新数据。与之对应的是有限队列,其最大长度在创建之际就已固定。一旦有限队列容量已满,再试图向其压入数据就会失败,或者发生阻塞,直到有数据弹出而产生容纳空间为止。
采用锁编写线程安全的查找表 查找表的使用方式不同于栈容器或队列容器。栈容器和队列容器的几乎每项操作都会进行改动,或增加元素,或删减元素,查找表却鲜有改动。
从并发的角度考虑,std::map<> 接口中的最大问题是迭代器。有一种思路是通过迭代器访问容器内部,即便有其他线程访问(并改动)容器数据,迭代器所提供的访问依然安全。这虽然可行,但颇为棘手。要令迭代器正确运行,我们就必须面对诸多问题,这些问题处理起来相当复杂,例如一个线程要删除某个元素,而它却正被迭代器引用。考虑到 std::map<> 严重依赖迭代器(标准库中的其他关联容器亦然),所以我们自己设计接口。
查找表基本接口:
增加配对的键/值对 
根据给定的键改变关联的值。 
移除某个键及其关联的值。 
根据给定的键获取关联值(若该键存在)。 
一些针对容器自身的整体操作也十分有用,如检查容器是否为空、以“快照”方式复制所有键或全体键/值对。 
 
为了实现更高的并发度,我们不能简单地对标准库提供的容器进行一层封装,这样并发度不高。考虑关联容器的几种实现方式:
红黑树每次查找或改动都要从根节点开始访问,必须逐层加锁解锁,提升不大;有序数组因为无法预知查找的目标的位置,所以必须对整个数组加锁,效率更差。所以我们选择散列表。
假定散列表具有固定数量的桶,每个键都属于一个桶,键本身的值和散列函数决定键具体属于哪个桶。这让我们可以安全地为每个桶使用独立的锁,从而增加并发度。问题是,我们需要一个针对键的散列函数。我们可以使用C++标准库提供的函数模板 std::hash<>。该函数已经具备针对基础类型的特化版本,如 int 和 std::string ,用户也可以方便地对其他类型的键进行特化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 #include  <memory>  #include  <mutex>  #include  <list>  #include  <map>  #include  <algorithm>  #include  <shared_mutex>  template  <typename  Key, typename  Value, typename  Hash = std::hash<Key>>class  threadsafe_lookup_table {private :     class  bucket_type {     private :         using  bucket_value = std::pair<Key, Value>;         using  bucket_data = std::list<bucket_value>;         using  bucket_iterator = typename  bucket_data::iterator;         bucket_data data;         mutable  std::shared_mutex mtx;         bucket_iterator find_entry_for (const  Key& key)  const   {             return  std::find_if (data.begin (), data.end (), [&key](const  bucket_value& item){                 return  item.first == key;             });         }     public :         Value value_for (const  Key& key, const  Value& default_value)  const   {             std::shared_lock<std::shared_mutex> lk (mtx)  ;             const  auto  found_entry = find_entry_for (key);             return  found_entry == data.end () ? default_value : found_entry->second;         }         void  add_or_update_mapping (const  Key& key, const  Value& value)   {             std::unique_lock<std::shared_mutex> lk (mtx)  ;             const  auto  found_entry = find_entry_for (key);             if  (found_entry == data.end ()) {                 data.push_back (bucket_value (key, value));             } else  {                 found_entry->second = value;             }         }         void  remove_mapping (const  Key& key)   {             std::unique_lock<std::shared_mutex> lk (mtx)  ;             const  auto  found_entry = find_entry_for (key);             if  (found_entry != data.end ()) {                 data.erase (found_entry);             }         }     };     std::vector<std::unique_ptr<bucket_type>> buckets;     Hash hasher;     bucket_type& get_bucket (const  Key& key)  const   {         const  auto  bucket_idx = hasher (key) % buckets.size ();         return  *buckets[bucket_idx];     }public :     using  key_type = Key;     using  mapped_type = Value;     using  hash_type = Hash;     threadsafe_lookup_table (unsigned  num_buckets = 19 , const  Hash& hasher_ = Hash ())         : buckets (num_buckets), hasher (hasher_) {         for  (unsigned  i = 0 ; i < num_buckets; ++i) {             buckets[i].reset (new  bucket_type);         }     }     threadsafe_lookup_table (const  threadsafe_lookup_table&) = delete ;     threadsafe_lookup_table& operator =(const  threadsafe_lookup_table&) = delete ;          Value value_for (const  Key& key, const  Value& default_value = Value())  const   {         return  get_bucket (key).value_for (key, default_value);     }     void  add_or_update_mapping (const  Key& key, const  Value& value)   {         get_bucket (key).add_or_update_mapping (key, value);     }     void  remove_mapping (const  Key& key)   {         get_bucket (key).remove_mapping (key);     }     std::map<Key, Value> get_map ()  const   {         std::vector<std::unique_lock<std::shared_mutex>> lks;         for  (const  auto & bucket : buckets) {             lks.push_back (std::unique_lock <std::shared_mutex>(bucket.mtx));         }         std::map<Key, Value> res;         for  (const  auto & bucket : buckets) {             for  (auto  it = bucket.data.begin (); it != bucket.data.end (); ++it) {                 res.insert (*it);             }         }         return  res;     } };
 
因为桶的数量是固定的,因此对buckets的访问无需加锁。对于每个桶的访问,都由共享锁妥善的处理进行了保护。 
异常安全:
value_for不涉及改动,即使抛出异常,也不污染数据结构。 
remove_mapping通过erase改动链表,该调用肯定不会抛出异常。 
add_or_update_mapping如果执行add操作,进行push_back,该操作是异常安全的,即使抛出异常,也不会污染链表。如果执行update操作,进行赋值操作,该操作不是异常安全的,可能会使关联的值处于混合状态。可以将异常安全交给使用者处理。 
 
 
 
采用多种锁编写线程安全的链表 前面提到在多线程情况下,我们需要避免支持迭代器。为了提供迭代功能,可以考虑用成员函数的形式来实现,例如for_each。但是可能会导致死锁,因为如果for_each的存在要有意义,则其必须在持有内部锁的时候运行用户代码,而且还必须向用户传递每一项数据,而且数据应该按值传递。我们只有要求使用者来确保其提供的用户代码不会试图获取锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #include  <memory>  #include  <mutex>  template  <typename  T>class  threadsafe_list  {     struct  node  {         std::mutex mtx;         std::shared_ptr<T> data;         std::unique_ptr<node> next;         node () :             next ()         { }         node (const  T& value):             data (std::make_shared <T>(value)),             next ()         { }     };     node head;public :     threadsafe_list () = default ;     ~threadsafe_list () {     }     threadsafe_list (const  node&) = delete ;     threadsafe_list& operator =(const  threadsafe_list&) = delete ;     void  push_front (const  T& value)   {         auto  new_node = std::make_unique <node>(value);         std::lock_guard<std::mutex> lk (head.mtx)  ;         new_node->next = std::move (head.next);         head.next = std::move (new_node);     }     template  <typename  Function>     void  for_each (Function f)   {         auto  cur_node = &head;         std::unique_lock<std::mutex> lk (head.mtx)  ;         while  (node* const  next = cur_node->next.get ()) {             std::unique_lock<std::mutex> next_lk (next->mtx)  ;             lk.unlock ();             f (*next->data);             cur_node = next;             lk = std::move (next_lk);         }     }     template  <typename  Predicate>     std::shared_ptr<T> find_first_if (Predicate p)   {         auto  cur_node = &head;         std::unique_lock<std::mutex> lk (head.mtx)  ;         while  (node* const  next = cur_node->next.get ()) {             std::unique_lock<std::mutex> next_lk (next->mtx)  ;             lk.unlock ();             if  (p (*next->data)) {                 return  next->data;             }             cur_node = next;             lk = std::move (next_lk);         }         return  std::shared_ptr <T>();     }     template  <typename  Predicate>     void  remove_if (Predicate p)   {         auto  cur_node = &head;         std::unique_lock<std::mutex> lk (head.mtx)  ;         while  (node* const  next = cur_node->next.get ()) {             std::unique_lock<std::mutex> next_lk (next->mtx)  ;             if  (p (*next->data)) {                 auto  old_next = std::move (cur_node->next);                 cur_node->next = std::move (next->next);                 next_lk.unlock ();             } else  {                 lk.unlock ();                 cur_node = next;                 lk = std::move (next_lk);             }         }     } };
 
以上实现的核心思想是,按照链表中节点的前后顺序加锁,没有任何一个线程能够超越他人的处理节点,若一个线程需在某节点上耗费特别长的时间,那么其他线程在到达该节点时就不得不等待。