4 并发操作的同步 4.1 等待事件或等待其他条件 线程A需要等待线程B完成任务,有几种方式
在共享数据内部维护一个标志(互斥保护),线程B完成后,设置该标志。
这种方法中线程A必须不断轮询检查标志。存在资源浪费。
让线程A调用 std::this_thread::sleep_for()
函数,在各次检验之间短暂休眠。
1 2 3 4 5 6 7 8 9 10 11 12 bool flag; std::mutex m;void wait_for_flag () { std::unique_lock<std::mutex> lk (m) ; while (!flag) { lk.unlock (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); lk.lock (); } }
上述代码虽然效率有所提升,但是休眠时间的长短很难确定。如果时间过短,线程仍会频繁检查;时间过长,会导致延迟,任务已经完成但线程A还在休眠。对于实时应用,游戏等,不能接受。
最基本的方式是条件变量。当条件成立时,就通过该条件变量,通知所有等待的线程,将他们唤醒继续处理。
4.1.1 凭借条件变量等待条件成立 C++标准库提供了条件变量的两种实现:std::condition_variable 和std::condition_variable_any。它们都在标准库的头文件内声明。两者都需配合互斥,方能提供妥当的同步操作。std::condition_variable仅限于与std::mutex一起使用;然而,只要某一类型符合成为互斥的最低标准,足以充当互斥,std::condition_variable_any即可与之配合使用,因此它的后缀是“_any”。由于std::condition_variable_any更加通用,它可能产生额外开销,涉及其性能、自身的体积或系统资源等,因此std::condition_variable应予优先采用,除非有必要令程序更灵活。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 std::mutex mut; std::queue<data_chunk> data_queue; std::condition_variable data_cond;void data_preparation_thread () { while (more_data_to_prepare ()) { data_chunk const data=prepare_data (); { std::lock_guard<std::mutex> lk (mut) ; data_queue.push (data); } data_cond.notify_one (); } }void data_processing_thread () { while (true ) { std::unique_lock<std::mutex> lk (mut) ; data_cond.wait ( lk,[]{return !data_queue.empty ();}); ⑤ data_chunk data=data_queue.front (); data_queue.pop (); lk.unlock (); process (data); if (is_last_chunk (data)) break ; } }
注意
线程B解锁互斥后再通知线程A,这样线程A唤醒后无需等待互斥解锁,不会再次被阻塞
线程A在条件变量上调用wait,传入锁对象和一个lambda函数,用于表达需要等待成立的条件。只有条件成立,wait才会返回;否则,wait解锁互斥量,并阻塞线程。
线程B准备好数据后,通知线程A,A被唤醒后,首先获取互斥量,然后检查条件是否成立。
如果线程A重新获得互斥,并且查验条件,而这一行为却不是直接响应线程乙的通知,则称之为伪唤醒(spurious wake)。按照C++标准的规定,这种伪唤醒出现的数量和频率都不确定。因此,需要考虑检验函数是否有副作用。
4.1.2 利用条件变量构建线程安全的队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 #include <condition_variable> #include <thread> #include <iostream> #include <memory> #include <queue> #include <mutex> template <typename T>class ThreadSafeQueue {public : ThreadSafeQueue () { } ThreadSafeQueue (const ThreadSafeQueue& other) { std::lock_guard<std::mutex> lk (other.mtx_) ; data_ = other.data_; } bool empty () const { std::lock_guard<std::mutex> lk (mtx_) ; return data_.empty (); } void push (T new_value) { { std::lock_guard<std::mutex> lk (mtx_) ; data_.push (new_value); } cond_.notify_one (); } bool try_pop (T& value) { std::lock_guard<std::mutex> lk (mtx_) ; if (data_.empty ()) return false ; value = data_.front (); data_.pop (); return true ; } std::shared_ptr<T> try_pop () { std::lock_guard<std::mutex> lk (mtx_) ; if (data_.empty ()) return std::make_shared <T>(); auto res = std::make_shared <T>(data_.front ()); data_.pop (); return res; } void wait_and_pop (T& value) { std::unique_lock<std::mutex> lk (mtx_) ; cond_.wait (lk, [this ]{ return !data_.empty (); }); value = data_.front (); data_.pop (); } std::shared_ptr<T> wait_and_pop () { std::unique_lock<std::mutex> lk (mtx_) ; cond_.wait (lk, [this ]{ return !data_.empty (); }); auto res = std::make_shared <T>(data_.front ()); data_.pop (); return res; }private : mutable std::mutex mtx_; std::condition_variable cond_; std::queue<T> data_; }; ThreadSafeQueue<int > q;void ThreadFuncA () { for (int i = 0 ; i < 10 ; ++i) { std::cout << i << std::endl; q.push (i); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } }void ThreadFuncB () { for (int i = 0 ; i < 10 ; ++i) { auto value = q.wait_and_pop (); std::cout << "i = " << i << ", value = " << *value << std::endl; } }int main () { std::thread A (ThreadFuncA) ; ThreadFuncB (); A.join (); }
注意
因为互斥量会因为锁操作而变化,为了支持const对象,其必须用mutable修饰。
4.2 使用future等待一次性事件发生 4.2.1 从后台任务返回值 如果并不急需线程运算的值,就可以使用 std::async()
(函数模板,声明位于头文件 <future>
中)按异步方式启动任务。我们从 std::async()
函数处获得 std::future
对象(而非 std::thread
对象),运行的函数一旦完成,其返回值就由该对象最后持有。若要用到这个值,只需在 future
对象上调用get()
,当前线程就会阻塞,以便 future
准备妥当并返回该值。
下面是一个最简单的例子,用 std::future
取得异步任务的函数返回值。
1 2 3 4 5 6 7 8 9 10 #include <future> #include <iostream> int find_the_answer_to_ltuae () ;void do_other_stuff () ;int main () { std::future<int > the_answer=std::async (find_the_answer_to_ltuae); do_other_stuff (); std::cout<<"The answer is " <<the_answer.get ()<<std::endl; }
std::async()
可以接收附加参数,进而传递给任务函数作为参数,同 std::thread
的构造函数一致。
可以给 std::async()
补充一个参数,以指定采用哪种运行方式。参数的类型是 std::launch
,其值可以是 std::launch::deferred
或 std::launch::async
。前者指定在当前线程上延后调用任务函数,等到在future上调用了 wait()
或 get()
,任务函数才会执行;后者指定必须另外开启专属的线程,在其上运行任务函数。该参数的值还可以是 std::launch::deferred | std::launch::async
,表示由 std::async()
的实现自行选择运行方式。
1 2 3 4 5 6 7 8 auto f6=std::async (std::launch::async,Y (),1.2 ); ⇽--- ①运行新线程auto f7=std::async (std::launch::deferred,baz,std::ref (x)); ⇽--- ②在wait ()或get ()内部运行任务函数auto f8=std::async ( ⇽--- std::launch::deferred | std::launch::async, baz,std::ref (x));auto f9=std::async (baz,std::ref (x)); ⇽--- ③交由实现自行选择运行方式 f7. wait (); ⇽--- ④前面②处的任务函数调用被延后,到这里才运行
4.2.2 关联future实例和任务 std::packaged_task<>
连结了future对象与函数(或可调用对象)。std::packaged_task<>
对象在执行任务时,会调用关联的函数(或可调用对象),把返回值保存为future的内部数据,并令future准备就绪。
std::packaged_task<>
是类模板,其模板参数是函数签名。传入的函数必须与之相符(不必严格匹配,只要能够互相转换即可)。
类模板 std::packaged_task<>
具有成员函数 get_future()
,它返回std::future<>
实例,该future的特化类型取决于函数签名所指定的返回值。
std::packaged_task<>
还具备函数调用操作符,它的参数取决于函数签名的参数列表。
std::packaged_task
对象是可调用对象,我们可以直接调用,还可以将其包装在 std::function
对象内,当作线程函数传递给 std::thread
对象,也可以传递给需要可调用对象的函数。
在线程间传递任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <deque> #include <mutex> #include <future> #include <thread> #include <utility> std::mutex m; std::deque<std::packaged_task<void ()>> tasks;bool gui_shutdown_message_received () ;void get_and_process_gui_message () ;void gui_thread () { while (!gui_shutdown_message_received ()) { get_and_process_gui_message (); std::packaged_task<void ()> task; { std::lock_guard<std::mutex> lk (m) ; if (tasks.empty ()) continue ; task=std::move (tasks.front ()); tasks.pop_front (); } task (); } }std::thread gui_bg_thread (gui_thread) ;template <typename Func>std::future<void > post_task_for_gui_thread (Func f) { std::packaged_task<void () > task (f) ; std::future<void > res=task.get_future (); std::lock_guard<std::mutex> lk (m) ; tasks.push_back (std::move (task)); return res; }
4.2.3 创建 std::promise
std::promise<T>
给出了一种异步求值的方法(类型为T),某个std::future<T>
对象与结果关联,能延后读出需要求取的值。配对的std::promise和std::future可实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的promise设定关联的值,使future准备就绪。
若需从给定的std::promise实例获取关联的std::future对象,调用前者的成员函数get_future()即可,这与std::packaged_task一样。promise的值通过成员函数set_value()设置,只要设置好,future即准备就绪,凭借它就能获取该值。如果std::promise在被销毁时仍未曾设置值,保存的数据则由异常代替。
4.2.4 将异常保存到future中 std::async()
若经由 std::async()
调用的函数抛出异常,会被保存到future中,代替本该设定的值,future进入就绪状态。等到其成员函数get被调用,异常被重新抛出。
std::packaged_task
同上。
std::promise
通过成员函数的显示调用实现。
1 2 3 4 5 6 7 8 9 extern std::promise<double > some_promise;try { some_promise.set_value (calculate_value ()); }catch (...) { some_promise.set_exception (std::current_exception ()); }
还有一种方式,通过直接销毁与future关联的promise对象或packaged_task对象。
4.2.5 多个线程一起等待 若我们在多个线程上访问同一个std::future对象,而不采取额外的同步措施,将引发数据竞争并导致未定义行为。
std::future仅能移动构造和移动赋值,所以归属权可在多个实例之间转移,但在相同时刻,只会有唯一一个future实例指向特定的异步结果;
std::shared_future的实例则能复制出副本,因此我们可以持有该类的多个对象,它们全指向同一异步任务的状态数据。
向每个线程传递std::shared_future对象的副本,它们为各线程独自所有,并被视作局部变量。因此,这些副本就作为各线程的内部数据,由标准库正确地同步,可以安全地访问。
std::shared_future的实例依据std::future的实例构造而得,前者所指向的异步状态由后者决定。因为std::future对象独占异步状态,其归属权不为其他任何对象所共有,所以若要按默认方式构造std::shared_future对象,则须用std::move向其默认构造函数传递归属权,这使std::future变成空状态(empty state)。
std::future具有成员函数share(),直接创建新的std::shared_future对象,并向它转移归属权。
4.3 限时等待