在使用多线程编程的时候,我们需要注意线程之间共享数据的问题,这种问题基本上是对统一数据同时读写造成的,详细情况可以参考计算机操作系统,这里不多赘述。为了避免这种情况,我们可以使用几种方法来解决,一是给数据上锁(互斥元,mutex),这是本章的内容;二是无锁编程(lock-free programming),这将在后面的章节中讨论;还有一种方式是将对数据结构的操作更新为一个事务(transaction)。
一、使用互斥元保护共享数据
这里我们不过多介绍mutex的设计思想,我们主要介绍其在C++中的使用。
1.1 在C++中使用mutex
在C++中,通过构造std::mutex的实例创建互斥元,调用成员函数lock()来锁定它,调用成员函数unlock()来结果它。然而,直接调用成员函数是不推荐的做法,因为我们需要在函数包括异常在内的每个出口都调用unlock()。作为替代,STL提供来std::lock_guard类模板,实现了互斥元的RAII惯用语法;它在构造时锁定所给的互斥元,在析构时解锁所给的互斥元。下面是一个演示,std::mutex和std::lock_guard()都声明于<mutex>。
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list;
std::mutex some_mutex;
// 向链表中添加元素
void add_to_list(int val)
{
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(val);
}
// 查找元素val是否在链表中
bool list_contains(int val)
{
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(), some_list.end(), val)
!= some_list.end();
}
在上面的代码中,我们使用了一个全局mutex来对列表实施来保护。在通常的使用中,我们可以将它封装到一个类中,以private的形势来避免冲突。但值得注意的是,传递引用和指针可能会造成越过mutex的情况,下面来看一段糟糕的代码:
/* ===== class : 数据类 ===== */
class data
{
private:
int m_a;
public:
void DoSth();
}
/* ===== class : 封装数据类 ===== */
class data_wrapper
{
private:
data m_data;
std::mutex m_dataMutex;
public:
template<typename Function>
void ProcessData(Function func)
{
// 传递“受保护”的数据到func
std::lock_guard<std::mutex> guard(m_dataMutex);
func(m_data);
}
}
data* unprotectedData;
// 一个恶意函数,用于获取data_wrapper中的&m_data
void malicious_func(data& protectedData)
{
unprotectedData = & protectedData;
}
data_warpper dw;
void foo()
{
dw.process_data(malicious_func);
// unpData已经或得了data的引用,现在可以绕开mutex直接使用doSth
unprotectedData->DoSth();
}
在上面的的代码中,我们通过foo,向data_wrapper传入一个恶意函数,或得了其m_data的引用。而获得了data的unprotectedData就可以绕过mutex无需锁定互斥元即可调用DoSth。所以,我们应当注意不要将受保护数据的指针和引用传递到锁的范围之外,无论是从函数中返回它们、将其存放在外部可见的内存中,还是作为参数传递给用户提供的函数。
1.2 发现接口中固有的竞争条件
回忆下我们的第一段代码,我们对一个list整体添加了保护,而不是对其中的每一个节点添加保护。如果是后者的话,我们在删除节点的时候仍然可能面对竞争问题。下面我们来看另一个例子:考虑在多线程中使用std::stack。下面是std::stack的接口(不包含构造函数)。
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
我们可以将上述的top修改为非引用的方式,这样就遵循来我们在1.1中提出的准则。但是这样的stack仍然是存在风险的,考虑下面这种情况:
std::stack<int> st;
if (!st.empty()) // 1
{
int const val = st.top(); // 2
st.pop(); // 3
doSth(val);
}
假设现在st中有一个元素,现在有两个线程访问它,它们的顺序是这样的:
线程A | 线程B |
---|---|
if(!st.empty()) | |
int const val = st.top(); | if(!st.empty()) |
st.pop(); | int const val = st.top(); |
doSth(val); | st.pop() |
doSth(val); |
我们可以看到,在上述的1、2、3之间可能发生竞争,甚至在后面的doSth()未被调用就已经出栈,使得val所引用的对象为空。下面我们来看几种解决方法:
1.2.1 T1:传入引用
我们考虑把接受出栈值的变量的引用,作为参数传递给pop()调用:
int result;
st.pop(result);
这样的代码在大多数时候都试用,但是如果我们的stack里面存放的是一个vector,那么我们在pop之前都将要事先构造一个vector,这是不推荐的。同时,这要求传入对象是可赋值的(operator=),而不仅仅是复制或者移动构造。
1.2.2 T2:要求不引发异常的拷贝或移动构造函数
提供不引发异常的拷贝、移动构造函数。
1.2.3 T3:返回指向出栈顶的指针
这种方法的优点是可以自由复制并且不会引发异常,随之而来的困难则是内存管理。对此我们可以使用std::shared_ptr来解决这一问题。
1.2.4 实现线程安全的stack
我们通过对std::stack加锁的方式来实现线程安全。
#include <exception>
#include <stack>
#include <mutex>
#include <memory>
#include <thread>
// 自定义异常
struct empty_stack : std::exception
{
const char* what() const throw()
{
std::cout << "empty stack!" << std::endl;
}
};
template<typename T>
class ts_stack
{
private:
std::stack<T> m_data;
// 使用mutable允许bool empty() const修改
mutable std::mutex m_mutex;
public:
// 默认构造函数 do nothing
ts_stack() { }
// 拷贝构造
ts_stack(const ts_stack& other)
{
std::lock_guard<std::mutex> lock(other.m_mutex);
m_data = other.m_data;
}
// 禁止赋值
ts_stack& operator=(const ts_stack&) = delete;
// 入栈
void push(T inputVal)
{
std::lock_guard<std::mutex> lock(this->m_mutex);
m_data.push(inputVal);
}
// 以ptr方式出栈
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(this->m_mutex);
if (m_data.empty()) throw empty_stack();
// res只能指向top(),const pointer to res
std::shared_ptr<T> const res(std::make_shared<T>(m_data.top()));
m_data.pop();
return res;
}
// 以传值的方式出栈
void pop(T& val)
{
std::lock_guard<std::mutex> lock(this->m_mutex);
if (m_data.empty()) throw empty_stack();
val = m_data.top();
m_data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(this->m_mutex);
return m_data.empty();
}
};
1.3 死锁
我们已经了解如何在线程之间共享数据,因为添加mutex我们遇到了新的麻烦:死锁。回忆哲学家就餐问题,如果每个哲学家们总是先拿左边的筷子,再拿右边的筷子那么就有可能造成死锁。下面我们来看一看在C++中如何处理死锁相关的问题。STL提供std::lock可以同时锁定两个或更多互斥元,下面我们来看一下它是如何运作的:
class data;
void swap(data& lhs, data& rhs);
class X
{
private:
data m_data;
std::mutex m_mutex;
public:
X(const data& p_data) : m_data(p_data) { }
friend void swap(X& lhs, X& rhs)
{
// 避免两个参数引用同一对象,从而引发异常
if (&lhs == &rhs) return;
// 锁定两个对象
std::lock(lhs.m_mutex, rhs.m_mutex);
// 构造lock_guard,使用std::adopt_lock参数告知其已经被锁定,避免在lg构造函数中重复锁定,并且让锁对象来管理互斥元
std::lock_guard<std::mutex> lock_lhs(lhs.m_mutex, std::adopt_lock);
std::lock_guard<std::mutex> rock_lhs(rhs.m_mutex, std::adopt_lock);
swap(lhs.m_data, rhs.m_data);
}
};
这里我们只需要注意两点,一是在构造多个std::lock_guard对象之前调用lock进行锁定;二是构造lg对象时使用参数std::adopt_lock使其沿用mutex已有锁的权限。值得一提的是,std::lock在锁定lhs.m_mutex或者rhs.m_mutex时都可能引发异常,该异常传播出std::lock。
1.4 避免死锁的进一步指南
死锁并不一定全来自于锁定,在两个线程上相互调用join()同样可以引发死锁,即循环等待。避免死锁的规则简而言之是,如果一个线程在等待你,那么你就不要等它。
1.4.1 避免嵌套锁
第一个思路是:如果你已经持有一个锁,就别再获取锁。如果你坚持这个准则,光凭使用锁是不可能导致死锁的,因为每个线程仅持有一个锁(想想每个哲学家只拿一根筷子吃饭)。如果要获取多个锁,应该以std::lock的单个动作来实现。
1.4.2 持有锁时,避免调用用户提供的代码
举个例子,在我们的多线程安全stack中,在参数类型上的每一个操作都是用户提供的代码。为了避免死锁,我们应当避免这种操作,然而当操作无法避免时,我们需要新的准则。
1.4.3 以固定顺序获取锁
这是在1.3中的例子,如果我们不能做到以std::lock单个动作实现锁定,那么次优做法是在每个线程中以相同顺序获得它们。
1.4.4 使用锁层次
锁层次的思路是:给mutex分级,当持有高级锁时,允许持有低级锁;当持有低级锁时,不允许持有高级锁。下面是一个简单演示:
// 两个具有不同层次等级的锁
Hierarchical_Mutex High_Level_Mutex(100);
Hierarchical_Mutex Low_Level_Mutex(50);
int do_low_level_stuff();
int do_high_level_stuff();
int low_level_func()
{
std::lock_guard<Hierarchical_Mutex> lg(Low_Level_Mutex);
return do_low_level_stuff();
}
int high_level_func()
{
std::lock_guard<Hierarchical_Mutex> lg(High_Level_Mutex);
return do_high_level_stuff();
}
// OK,它持有了高级锁再去持有低级锁
void thread_a()
{
high_level_func();
low_level_func();
}
// Not OK,它持有了低级锁,就不能持有比它更高级的锁
void thread_b()
{
low_level_func();
high_level_func();
}
下面我们来看一下如何实现Hierarchical_Mutex:
class Hierarchical_Mutex
{
private:
std::mutex m_internal_mutex;
// 具体对象锁的层次值
unsigned long const m_hierarchy_val;
// 前一个线程使用的层次值
unsigned long m_previous_hierarchy_val;
// 当前线程所有锁的最小值,以保证新锁等级不能超过旧锁
// 使用thread_local当前线程的层次值,在类外初始化为ULONG_MAX以保证第一次锁定能成功
static thread_local unsigned long m_this_thread_hierarchy_val;
void check_for_hierarchy_violation()
{
if(m_this_thread_hierarchy_val <= m_hierarchy_val)
{
throw std::logic_error("mutex hierarchy violated");
}
}
// 更新当前线程锁的层次值
void update_hierarchy_val()
{
m_previous_hierarchy_val = m_this_thread_hierarchy_val;
m_this_thread_hierarchy_val = m_hierarchy_val;
}
public:
explicit Hierarchical_Mutex(unsigned long val):
m_hierarchy_val(val),
m_previous_hierarchy_val(0)
{ }
void lock()
{
check_for_hierarchy_violation();
m_internal_mutex.lock();
update_hierarchy_val();
}
void unlock()
{
m_this_thread_hierarchy_val = m_previous_hierarchy_val;
m_internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if(!m_internal_mutex.try_lock()) return false;
update_hierarchy_val();
return true;
}
};
// 初始化m_this_thread_hierarchy_val
thread_local unsigned long Hierarchical_Mutex::m_this_thread_hierarchy_val(ULONG_MAX);
我们来仔细看一下它是如何运作的,以上面的代码为例:
Hierarchical_Mutex High_Level_Mutex(100);
Hierarchical_Mutex Low_Level_Mutex(50);
void thread_a()
{
high_level_func();
/*
第一次锁定,m_this_thread_hierarchy_val(ULONG_MAX) > m_hierarchy_val(100),锁定成功;
更新:m_this_thread_hierarchy_val = 100
m_previous_hierarchy_val = ULONG_MAX
*/
low_level_func();
/*
第二次锁定,m_this_thread_hierarchy_val(100) > m_hierarchy_val(50),锁定成功;
更新:m_this_thread_hierarchy_val = 50
m_previous_hierarchy_val = 100
*/
// 解锁时由低向高,由lock_guard自行完成
}
1.5 使用std::unique_lock灵活锁定
通过松弛不变量,std::unique_lock比std::lock_guard提供了更多的灵活性,一个std::unique_lock实例并不总是拥有与之相关联的互斥元。正如我们在1.3中所提到的,对std::lock_guard添加第二参数std::adopt_lock以让锁对象来管理互斥元一样,我们也可以把std::defer_lock作为第二参数传递,来表示该互斥元在构造时应保持被锁定。这个锁就可以在这之后通过std::unique_lock对象(而不是互斥元)上调用lock(),或是通过将std::unique_lock对象传递给std::lock()来获取。
下面我们使用std::unique_lock和std::defer_lock来替代std::lock_guard和std::adopt_lock,代码同1.3,作少许变化:
class data;
void swap(data& lhs, data& rhs);
class X
{
private:
data m_data;
std::mutex m_mutex;
public:
X(const data& p_data) : m_data(p_data) { }
friend void swap(X& lhs, X& rhs)
{
// 避免两个参数引用同一对象,从而引发异常
if (&lhs == &rhs) return;
// std::lock(lhs.m_mutex, rhs.m_mutex);
// std::lock_guard<std::mutex> lock_lhs(lhs.m_mutex, std::adopt_lock);
// std::lock_guard<std::mutex> rock_lhs(rhs.m_mutex, std::adopt_lock);
// std::defer_lock保留互斥元为未锁定
std::unique_lock<std::mutex> lock_lhs(lhs.m_mutex, std::defer_lock);
std::unique_lock<std::mutex> lock_rhs(rhs.m_mutex, std::defer_lock);
// 注意,这里传递std::uinique_lock,mutex在这里被锁定
std::lock(lock_lhs, lock_rhs);
swap(lhs.m_data, rhs.m_data);
}
};
值得注意的是,如果我们在class X已经锁定了互斥元,那么我们在其析构函数中应该调用unlock,我们可以通过owns_lock()来判别:
void Unlock(std::unique_lock<std::mutex>& lock)
{
REALM_ASSERT(lock.owns_lock());
lock.unlock();
}
std::unique_lock相比std::lock_guard来说,空间时间花费更多,但是换来了更多的灵活性,在能使用lock_guard的时候有限使用lock_guard。如果你要实现延迟锁定(上面的代码那样)、在作用域之间转移锁的所有权,考虑使用std::unique_lock。
1.6 在作用域之间转移锁的所有权
因为std::unique_lock实例并没有拥有与其相关的互斥元,所以通过moving,互斥元的所有权可以在实例之间进行转移。在某些情况下,这些转移是自动的,例如从函数中返回一个实例;而在其他情况下,则必须通过std::move()来显示实现。从根本上来说,这取决于源是否是左值。如果是左值,所有权的转移必须是显示的;如果是右值,所有权的转移则是自动的。
一种可能的用法,是允许函数锁定一个互斥元,并将此锁的所有权转移给调用者,于是调用者接下来可以在同一个锁的保护下执行额外的操作。下面的代码片段展示了这样的例子:函数get_lock()锁定了互斥元,然后将锁返回给调用者之前准备的数据。
std::unque_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> ul(some_mutex);
prepare_data();
// ul在函数内部声明,可以直接return而无需调用std::move()
return ul;
}
void process_data()
{
auto ul(get_lock());
do_sth();
}
1.7 锁定在恰当的粒度
锁粒度是一个文字术语,用来描述单个锁所保护的数据量。细颗粒度保护着少量的数据,粗颗粒读保护着大量数据。选择一个合适的颗粒度来保护数据是很重要的,同样重要的是,确保只在真正需要锁的操作中持有锁。如果可能,仅在实际访问共享数据的时候锁定互斥元,尝试在锁的外面做任意的数据处理。特别的,在持有锁时,不要做任何确实可能的耗时活动,例如文件IO。除非真的需要保护文件,否则不要持有锁。
在这种情况下运作良好,因为它能在代码不需要访问共享数据时调用unlock(),然后在代码需要访问共享数据时再次调用lock()。
void get_and_process_data()
{
std::unique_lock<std::mutex> my_lock(the_mutex);
data data_to_process = get_next_data();
// 加工时不需要加锁,这里解锁
my_lock.unlock();
result res = process(data_to_process);
// 锁定以确保数据不会变更
my_lock.lock();
write_result(data_to_process, res);
}
这种锁定方式的好处是显而易见的,如果使用互斥元保护整个数据结构,不仅可能让更多的锁竞争,同时锁被持有的时间也可能延长。如上所示,锁定在恰当的粒度不仅关乎锁定的数据量,这也是关系到锁会被持有多长时间,以及在持有锁时执行哪些操作。一般情况下,只应该以执行要求的操作所需的最小可能时间去持有锁。这也意味着耗时的操作,例如获取另外一个锁、文件IO,都不应该在持有锁的时候去做,除非必要。
在之前的例子中,我们的swap函数明显需要并发访问两个对象,所以我们锁定了两个互斥量。下面来看另一个例子,我们考虑重载operator==以进行两个实例之间的比较:
class Y
{
private:
int m_data;
// 使用mutable允许int get_data() const修改
mutable std::mutex m_mutex;
int get_data() const
{
std::lock_guard<std::mutex> lg(m_mutex);
return m_data;
}
public:
Y(int data) : m_data(data) { }
friend bool operator==(Y const& lhs, Y const& rhs)
{
if (&lhs == &rhs) return true;
// 分两次锁定不同对象的互斥量
int const lhs_val = lhs.get_data();
int const rhs_val = rhs.get_data();
return lhs_val == rhs_val;
}
};
可以看到,我们分别使用两次锁,而不是一次锁定两个互斥量,这样当然会让运行更快。但随之而来的问题则是,在两次get_data()之间可能会有操作使得rhs发生改变,从而使得我们函数的语义发生改变:以前是,lhs和rhs在同一时间相等吗;现在是,某一时间的lhs和另以时间的rhs相等吗。
有时,根本没有一个合适的颗粒级别,并非所有的对数据结构的访问都要要求同样级别的保护。在这种情况下,我们考虑使用代替机制。
二、用于共享数据保护的替代工具
虽然互斥元是最通用的机制,但是提到保护共享数据时,它们并不是唯一的选择。
一种特别极端(但相当常见)的情况是,共享数据只有在初始化时才需要并发访问的保护,但在那之后却不需要显式同步。这可能是因为从一开始,数据就是只读的,所以不存在同步的问题;或者是因为必要的保护作为数据上操作的一部分被隐式地执行。在任一情况中,在数据被初始化后锁定互斥元,纯粹是为了保护初始化,这是不必要的,并且会对性能产生影响。基于这个原因,STL提供了一种机制,纯粹在初始化过程中保护共享数据。
2.1 在初始化时保护共享数据
假设现在有一个构造起来非常昂贵的资源(也许是数据库链接,或者是分配内存)。像这样的延迟初始化(Lazy Initialization)在单线程代码中是很常见的:
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
// 延迟初始化
if (!resource_ptr) resource_ptr.reset(new some_resource);
resource_ptr->dosth();
}
如果共享资源本身对于并发访问是安全的,当将其转换为多线程代码时,唯一需要保护的部分就是其初始化部分。但是像下面一段代码中的转换,会引起使用资源的程序产生不必要的序列化。这是因为每个线程都必须等待互斥元,以检查资源是否初始化。
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex> lk(rescoure_mutex);
if (!resource_ptr) resource_ptr.reset(new some_resource);
lk.unlock();
resource_ptr->dosth();
}
C++STL提供了std::once_flag和std::call_once,与其锁定互斥元并显式地检查指针,不如每个线程都可以调用std::call_once。我们通过下面的例子来看一下,通过std::call_once来实现同上述代码相同的功能。
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
// 调用一次初始化
std::call_once(resource_flag, init_resource);
resource_ptr->dosth();
}
下面我们来看一个例子,演示如何使用std::call_once完成类成员的延迟初始化。
class X
{
private:
ConnInfo m_CI;
ConnHandle m_CH;
std::once_flag m_ConnInit_Flag;
void openConn()
{
// connection
}
public:
X(const ConnInfo& a_CI) : m_CI(a_CI) { }
void Send(const Data& data)
{
std::call_once(m_ConnInit_Flag, &X::openConn, this);
m_CH.send(data);
}
data Receive()
{
std::call_once(m_ConnInit_Flag, &X::openConn, this);
return m_CH.receive();
}
}
在上面的例子中,初始化在首次调用Send()或Receive()时完成,使用成员函数openConn()来完成初始化,同时最后需要将this传入。值得注意的是,像std::mutex和std::once_flag这种不能复制或移动的实例来说,如果要使用它,就应该显式地定义。
2.2 保护很少更新的数据结构
除了在初始化后只读的数据,还有些很少更新的数据,它们在大多数情况下是只读的。下面来看一下如何对它们进行保护。假设现在我们有一个DNS缓存表,我们希望确保所有线程在读写时正常运行。我们可以使用std::mutex来完成,但是使用std::mutex的一个缺点是,会消除数据结构没有进行修改时的并发读取数据的可能性。我们可以考虑一种新的互斥元,reader-writer mutex,它考虑了两种不同的用法:由单个“写”线程独占访问或共享;由多个“读”线程并发访问。
STL并没有实现这样的互斥元,所以我们通过Boost库的boost::shared_mutex来代替std::mutex。boost::shared_mutex的设计很像STL库的smart ptr:当任意一个线程由于共享锁时,其他试图独占锁的线程会被阻塞,知道其他线程撤回它们的锁;当任意一个线程拥有独占锁时,其他线程都不能拥有独占锁或共享锁,直到第一个线程撤销它的独占锁。
下面我们来以DNS表的并发访问来演示如何使用boost::shared_mutex。
#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>
class DnsEntry;
class DnsCache
{
private:
std::map<std::string, DnsEntry> m_Entries;
mutable boost::shared_mutex m_Mutex;
public:
DnsEntry FindEntry(const std::string& domain) const
{
// 读取,共享锁
boost::shared_lock<boost::shared_mutex> lk(m_Mutex);
// 查找
std::map<std::string, DnsEntry>::const_iterator const it = m_Entries.find(domain);
return it == m_Entries.end() ?
DnsEntry() :
it-second();
}
void UpdateEntry(const std::string& domain, const DnsEntry& dns)
{
// 写入,独占锁
std::lock_guard<boost::shared_mutex> lk(m_Mutex);
m_Entries[domain] = dns;
}
}
Comments | NOTHING