站点图标

并发编程 03 在线程间共享数据

  在使用多线程编程的时候,我们需要注意线程之间共享数据的问题,这种问题基本上是对统一数据同时读写造成的,详细情况可以参考计算机操作系统,这里不多赘述。为了避免这种情况,我们可以使用几种方法来解决,一是给数据上(互斥元,mutex),这是本章的内容;二是无锁编程(lock-free programming),这将在后面的章节中讨论;还有一种方式是将对数据结构的操作更新为一个事务(transaction)。

一、使用互斥元保护共享数据

  这里我们不过多介绍mutex的设计思想,我们主要介绍其在C++中的使用。

1.1 在C++中使用mutex

  在C++中,通过构造std::mutex的实例创建互斥元,调用成员函数lock()来锁定它,调用成员函数unlock()来结果它。然而,直接调用成员函数是不推荐的做法,因为我们需要在函数包括异常在内的每个出口都调用unlock()。作为替代,STL提供来std::lock_guard类模板,实现了互斥元的RAII惯用语法;它在构造时锁定所给的互斥元,在析构时解锁所给的互斥元。下面是一个演示,std::mutex和std::lock_guard()都声明于<mutex>。

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;
std::mutex some_mutex;

// 向链表中添加元素
void add_to_list(int val)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    some_list.push_back(val);
}

// 查找元素val是否在链表中
bool list_contains(int val)
{
    std::lock_guard<std::mutex> guard(some_mutex);

    return std::find(some_list.begin(), some_list.end(), val) 
        != some_list.end();
}

  在上面的代码中,我们使用了一个全局mutex来对列表实施来保护。在通常的使用中,我们可以将它封装到一个类中,以private的形势来避免冲突。但值得注意的是,传递引用和指针可能会造成越过mutex的情况,下面来看一段糟糕的代码:

/* ===== class : 数据类 ===== */
class data
{
private:
    int m_a;
public: 
    void DoSth();
}

/* ===== class : 封装数据类 ===== */
class data_wrapper
{
private:
    data m_data;
    std::mutex m_dataMutex;
public:
    template<typename Function>
    void ProcessData(Function func)
    {
        // 传递“受保护”的数据到func
        std::lock_guard<std::mutex> guard(m_dataMutex);
        func(m_data);
    }
}

data* unprotectedData;

// 一个恶意函数,用于获取data_wrapper中的&m_data
void malicious_func(data& protectedData)
{
    unprotectedData = & protectedData;
}

data_warpper dw;

void foo()
{
    dw.process_data(malicious_func);
    // unpData已经或得了data的引用,现在可以绕开mutex直接使用doSth
    unprotectedData->DoSth();
}

  在上面的的代码中,我们通过foo,向data_wrapper传入一个恶意函数,或得了其m_data的引用。而获得了data的unprotectedData就可以绕过mutex无需锁定互斥元即可调用DoSth。所以,我们应当注意不要将受保护数据的指针和引用传递到锁的范围之外,无论是从函数中返回它们、将其存放在外部可见的内存中,还是作为参数传递给用户提供的函数

1.2 发现接口中固有的竞争条件

  回忆下我们的第一段代码,我们对一个list整体添加了保护,而不是对其中的每一个节点添加保护。如果是后者的话,我们在删除节点的时候仍然可能面对竞争问题。下面我们来看另一个例子:考虑在多线程中使用std::stack。下面是std::stack的接口(不包含构造函数)。

bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);

  我们可以将上述的top修改为非引用的方式,这样就遵循来我们在1.1中提出的准则。但是这样的stack仍然是存在风险的,考虑下面这种情况:

std::stack<int> st;

if (!st.empty())                    // 1
{
    int const val = st.top();       // 2
    st.pop();                       // 3
    doSth(val);
}

假设现在st中有一个元素,现在有两个线程访问它,它们的顺序是这样的:

线程A 线程B
if(!st.empty())
int const val = st.top(); if(!st.empty())
st.pop(); int const val = st.top();
doSth(val); st.pop()
doSth(val);

我们可以看到,在上述的1、2、3之间可能发生竞争,甚至在后面的doSth()未被调用就已经出栈,使得val所引用的对象为空。下面我们来看几种解决方法:

1.2.1 T1:传入引用

  我们考虑把接受出栈值的变量的引用,作为参数传递给pop()调用:

int result;
st.pop(result);

这样的代码在大多数时候都试用,但是如果我们的stack里面存放的是一个vector,那么我们在pop之前都将要事先构造一个vector,这是不推荐的。同时,这要求传入对象是可赋值的(operator=),而不仅仅是复制或者移动构造。

1.2.2 T2:要求不引发异常的拷贝或移动构造函数

  提供不引发异常的拷贝、移动构造函数。

1.2.3 T3:返回指向出栈顶的指针

  这种方法的优点是可以自由复制并且不会引发异常,随之而来的困难则是内存管理。对此我们可以使用std::shared_ptr来解决这一问题。

1.2.4 实现线程安全的stack

  我们通过对std::stack加锁的方式来实现线程安全。

#include <exception>
#include <stack>
#include <mutex>
#include <memory>
#include <thread>

// 自定义异常
struct empty_stack : std::exception
{
    const char* what() const throw() 
    {
        std::cout << "empty stack!" << std::endl;
    }
};

template<typename T>
class ts_stack
{
private:
    std::stack<T> m_data;

    // 使用mutable允许bool empty() const修改
    mutable std::mutex m_mutex;

public:
    // 默认构造函数 do nothing
    ts_stack() { }

    // 拷贝构造
    ts_stack(const ts_stack& other)
    {
        std::lock_guard<std::mutex> lock(other.m_mutex);
        m_data = other.m_data;
    }

    // 禁止赋值
    ts_stack& operator=(const ts_stack&) = delete;

    // 入栈
    void push(T inputVal)
    {
        std::lock_guard<std::mutex> lock(this->m_mutex);
        m_data.push(inputVal);
    }

    // 以ptr方式出栈
    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(this->m_mutex);
        if (m_data.empty()) throw empty_stack();

        // res只能指向top(),const pointer to res
        std::shared_ptr<T> const res(std::make_shared<T>(m_data.top()));
        m_data.pop();
        return res;
    }

    // 以传值的方式出栈
    void pop(T& val)
    {
        std::lock_guard<std::mutex> lock(this->m_mutex);
        if (m_data.empty()) throw empty_stack();

        val = m_data.top();
        m_data.pop();
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lock(this->m_mutex);
        return m_data.empty();
    }
};

1.3 死锁

  我们已经了解如何在线程之间共享数据,因为添加mutex我们遇到了新的麻烦:死锁。回忆哲学家就餐问题,如果每个哲学家们总是先拿左边的筷子,再拿右边的筷子那么就有可能造成死锁。下面我们来看一看在C++中如何处理死锁相关的问题。STL提供std::lock可以同时锁定两个或更多互斥元,下面我们来看一下它是如何运作的:

class data;
void swap(data& lhs, data& rhs);

class X
{
private:
    data m_data;
    std::mutex m_mutex;
public:
    X(const data& p_data) : m_data(p_data) { }

    friend void swap(X& lhs, X& rhs)
    {
        // 避免两个参数引用同一对象,从而引发异常
        if (&lhs == &rhs)   return;

        // 锁定两个对象
        std::lock(lhs.m_mutex, rhs.m_mutex);

        // 构造lock_guard,使用std::adopt_lock参数告知其已经被锁定,避免在lg构造函数中重复锁定,并且让锁对象来管理互斥元
        std::lock_guard<std::mutex> lock_lhs(lhs.m_mutex, std::adopt_lock);
        std::lock_guard<std::mutex> rock_lhs(rhs.m_mutex, std::adopt_lock);

        swap(lhs.m_data, rhs.m_data);
    }
};

这里我们只需要注意两点,一是在构造多个std::lock_guard对象之前调用lock进行锁定;二是构造lg对象时使用参数std::adopt_lock使其沿用mutex已有锁的权限。值得一提的是,std::lock在锁定lhs.m_mutex或者rhs.m_mutex时都可能引发异常,该异常传播出std::lock。

1.4 避免死锁的进一步指南

  死锁并不一定全来自于锁定,在两个线程上相互调用join()同样可以引发死锁,即循环等待。避免死锁的规则简而言之是,如果一个线程在等待你,那么你就不要等它

1.4.1 避免嵌套锁

  第一个思路是:如果你已经持有一个锁,就别再获取锁。如果你坚持这个准则,光凭使用锁是不可能导致死锁的,因为每个线程仅持有一个锁(想想每个哲学家只拿一根筷子吃饭)。如果要获取多个锁,应该以std::lock的单个动作来实现。

1.4.2 持有锁时,避免调用用户提供的代码

  举个例子,在我们的多线程安全stack中,在参数类型上的每一个操作都是用户提供的代码。为了避免死锁,我们应当避免这种操作,然而当操作无法避免时,我们需要新的准则。

1.4.3 以固定顺序获取锁

  这是在1.3中的例子,如果我们不能做到以std::lock单个动作实现锁定,那么次优做法是在每个线程中以相同顺序获得它们。

1.4.4 使用锁层次

  锁层次的思路是:给mutex分级,当持有高级锁时,允许持有低级锁;当持有低级锁时,不允许持有高级锁。下面是一个简单演示:

// 两个具有不同层次等级的锁
Hierarchical_Mutex High_Level_Mutex(100);
Hierarchical_Mutex Low_Level_Mutex(50);

int do_low_level_stuff();
int do_high_level_stuff();

int low_level_func()
{
    std::lock_guard<Hierarchical_Mutex> lg(Low_Level_Mutex);
    return do_low_level_stuff();
}

int high_level_func()
{
    std::lock_guard<Hierarchical_Mutex> lg(High_Level_Mutex);
    return do_high_level_stuff();
}

// OK,它持有了高级锁再去持有低级锁
void thread_a()
{
    high_level_func();
    low_level_func();
}

// Not OK,它持有了低级锁,就不能持有比它更高级的锁
void thread_b()
{
    low_level_func();
    high_level_func();
}

  下面我们来看一下如何实现Hierarchical_Mutex:

class Hierarchical_Mutex
{
private:
    std::mutex m_internal_mutex;

    // 具体对象锁的层次值
    unsigned long const m_hierarchy_val;

    // 前一个线程使用的层次值
    unsigned long m_previous_hierarchy_val;

    // 当前线程所有锁的最小值,以保证新锁等级不能超过旧锁
    // 使用thread_local当前线程的层次值,在类外初始化为ULONG_MAX以保证第一次锁定能成功
    static thread_local unsigned long m_this_thread_hierarchy_val;

    void check_for_hierarchy_violation()
    {
        if(m_this_thread_hierarchy_val <= m_hierarchy_val)
        {
            throw std::logic_error("mutex hierarchy violated");
        }
    }

    // 更新当前线程锁的层次值
    void update_hierarchy_val()
    {
        m_previous_hierarchy_val = m_this_thread_hierarchy_val;
        m_this_thread_hierarchy_val = m_hierarchy_val;
    }

public:
    explicit Hierarchical_Mutex(unsigned long val):
        m_hierarchy_val(val),
        m_previous_hierarchy_val(0)
        { }

    void lock()
    {
        check_for_hierarchy_violation();
        m_internal_mutex.lock();
        update_hierarchy_val();
    }

    void unlock()
    {
        m_this_thread_hierarchy_val = m_previous_hierarchy_val;
        m_internal_mutex.unlock();
    }

    bool try_lock()
    {
        check_for_hierarchy_violation();
        if(!m_internal_mutex.try_lock())    return false;

        update_hierarchy_val();
        return true;
    }
};

// 初始化m_this_thread_hierarchy_val
thread_local unsigned long Hierarchical_Mutex::m_this_thread_hierarchy_val(ULONG_MAX);

  我们来仔细看一下它是如何运作的,以上面的代码为例:

Hierarchical_Mutex High_Level_Mutex(100);
Hierarchical_Mutex Low_Level_Mutex(50);

void thread_a()
{
    high_level_func();
    /*
        第一次锁定,m_this_thread_hierarchy_val(ULONG_MAX) > m_hierarchy_val(100),锁定成功;
        更新:m_this_thread_hierarchy_val = 100
            m_previous_hierarchy_val = ULONG_MAX
    */
    low_level_func();
    /*
        第二次锁定,m_this_thread_hierarchy_val(100) > m_hierarchy_val(50),锁定成功;
        更新:m_this_thread_hierarchy_val = 50
            m_previous_hierarchy_val = 100
    */

    // 解锁时由低向高,由lock_guard自行完成
}

1.5 使用std::unique_lock灵活锁定

  通过松弛不变量,std::unique_lock比std::lock_guard提供了更多的灵活性,一个std::unique_lock实例并不总是拥有与之相关联的互斥元。正如我们在1.3中所提到的,对std::lock_guard添加第二参数std::adopt_lock以让锁对象来管理互斥元一样,我们也可以把std::defer_lock作为第二参数传递,来表示该互斥元在构造时应保持被锁定。这个锁就可以在这之后通过std::unique_lock对象(而不是互斥元)上调用lock(),或是通过将std::unique_lock对象传递给std::lock()来获取。
  下面我们使用std::unique_lock和std::defer_lock来替代std::lock_guard和std::adopt_lock,代码同1.3,作少许变化:

class data;
void swap(data& lhs, data& rhs);

class X
{
private:
    data m_data;
    std::mutex m_mutex;
public:
    X(const data& p_data) : m_data(p_data) { }

    friend void swap(X& lhs, X& rhs)
    {
        // 避免两个参数引用同一对象,从而引发异常
        if (&lhs == &rhs)   return;

        // std::lock(lhs.m_mutex, rhs.m_mutex);

        // std::lock_guard<std::mutex> lock_lhs(lhs.m_mutex, std::adopt_lock);
        // std::lock_guard<std::mutex> rock_lhs(rhs.m_mutex, std::adopt_lock);

        // std::defer_lock保留互斥元为未锁定
        std::unique_lock<std::mutex> lock_lhs(lhs.m_mutex, std::defer_lock);
        std::unique_lock<std::mutex> lock_rhs(rhs.m_mutex, std::defer_lock);

        // 注意,这里传递std::uinique_lock,mutex在这里被锁定
        std::lock(lock_lhs, lock_rhs);

        swap(lhs.m_data, rhs.m_data);
    }
};

  值得注意的是,如果我们在class X已经锁定了互斥元,那么我们在其析构函数中应该调用unlock,我们可以通过owns_lock()来判别:

void Unlock(std::unique_lock<std::mutex>& lock)
{
    REALM_ASSERT(lock.owns_lock());
    lock.unlock();
}

  std::unique_lock相比std::lock_guard来说,空间时间花费更多,但是换来了更多的灵活性,在能使用lock_guard的时候有限使用lock_guard。如果你要实现延迟锁定(上面的代码那样)、在作用域之间转移锁的所有权,考虑使用std::unique_lock。

1.6 在作用域之间转移锁的所有权

  因为std::unique_lock实例并没有拥有与其相关的互斥元,所以通过moving,互斥元的所有权可以在实例之间进行转移。在某些情况下,这些转移是自动的,例如从函数中返回一个实例;而在其他情况下,则必须通过std::move()来显示实现。从根本上来说,这取决于源是否是左值。如果是左值,所有权的转移必须是显示的;如果是右值,所有权的转移则是自动的。

  一种可能的用法,是允许函数锁定一个互斥元,并将此锁的所有权转移给调用者,于是调用者接下来可以在同一个锁的保护下执行额外的操作。下面的代码片段展示了这样的例子:函数get_lock()锁定了互斥元,然后将锁返回给调用者之前准备的数据。

std::unque_lock<std::mutex> get_lock()
{
    extern std::mutex some_mutex;
    std::unique_lock<std::mutex> ul(some_mutex);
    prepare_data();

    // ul在函数内部声明,可以直接return而无需调用std::move()
    return ul;
}

void process_data()
{
    auto ul(get_lock());
    do_sth();
}

1.7 锁定在恰当的粒度

  锁粒度是一个文字术语,用来描述单个锁所保护的数据量。细颗粒度保护着少量的数据,粗颗粒读保护着大量数据。选择一个合适的颗粒度来保护数据是很重要的,同样重要的是,确保只在真正需要锁的操作中持有锁。如果可能,仅在实际访问共享数据的时候锁定互斥元,尝试在锁的外面做任意的数据处理。特别的,在持有锁时,不要做任何确实可能的耗时活动,例如文件IO。除非真的需要保护文件,否则不要持有锁。

  在这种情况下运作良好,因为它能在代码不需要访问共享数据时调用unlock(),然后在代码需要访问共享数据时再次调用lock()。

void get_and_process_data()
{
    std::unique_lock<std::mutex> my_lock(the_mutex);

    data data_to_process = get_next_data();

    // 加工时不需要加锁,这里解锁
    my_lock.unlock();

    result res = process(data_to_process);

    // 锁定以确保数据不会变更
    my_lock.lock();

    write_result(data_to_process, res);
}

  这种锁定方式的好处是显而易见的,如果使用互斥元保护整个数据结构,不仅可能让更多的锁竞争,同时锁被持有的时间也可能延长。如上所示,锁定在恰当的粒度不仅关乎锁定的数据量,这也是关系到锁会被持有多长时间,以及在持有锁时执行哪些操作。一般情况下,只应该以执行要求的操作所需的最小可能时间去持有锁。这也意味着耗时的操作,例如获取另外一个锁、文件IO,都不应该在持有锁的时候去做,除非必要。

  在之前的例子中,我们的swap函数明显需要并发访问两个对象,所以我们锁定了两个互斥量。下面来看另一个例子,我们考虑重载operator==以进行两个实例之间的比较:

class Y
{
private:
    int m_data;

    // 使用mutable允许int get_data() const修改
    mutable std::mutex m_mutex;

    int get_data() const
    {
        std::lock_guard<std::mutex> lg(m_mutex);
        return m_data;
    }

public:
    Y(int data) : m_data(data) { }

    friend bool operator==(Y const& lhs, Y const& rhs)
    {
        if (&lhs == &rhs)   return true;

        // 分两次锁定不同对象的互斥量
        int const lhs_val = lhs.get_data();
        int const rhs_val = rhs.get_data();

        return lhs_val == rhs_val;
    }
};

  可以看到,我们分别使用两次锁,而不是一次锁定两个互斥量,这样当然会让运行更快。但随之而来的问题则是,在两次get_data()之间可能会有操作使得rhs发生改变,从而使得我们函数的语义发生改变:以前是,lhs和rhs在同一时间相等吗;现在是,某一时间的lhs和另以时间的rhs相等吗。

  有时,根本没有一个合适的颗粒级别,并非所有的对数据结构的访问都要要求同样级别的保护。在这种情况下,我们考虑使用代替机制。

二、用于共享数据保护的替代工具

  虽然互斥元是最通用的机制,但是提到保护共享数据时,它们并不是唯一的选择。

  一种特别极端(但相当常见)的情况是,共享数据只有在初始化时才需要并发访问的保护,但在那之后却不需要显式同步。这可能是因为从一开始,数据就是只读的,所以不存在同步的问题;或者是因为必要的保护作为数据上操作的一部分被隐式地执行。在任一情况中,在数据被初始化后锁定互斥元,纯粹是为了保护初始化,这是不必要的,并且会对性能产生影响。基于这个原因,STL提供了一种机制,纯粹在初始化过程中保护共享数据。

2.1 在初始化时保护共享数据

  假设现在有一个构造起来非常昂贵的资源(也许是数据库链接,或者是分配内存)。像这样的延迟初始化(Lazy Initialization)在单线程代码中是很常见的:

std::shared_ptr<some_resource> resource_ptr;

void foo()
{
    // 延迟初始化
    if (!resource_ptr)  resource_ptr.reset(new some_resource);

    resource_ptr->dosth();
}

  如果共享资源本身对于并发访问是安全的,当将其转换为多线程代码时,唯一需要保护的部分就是其初始化部分。但是像下面一段代码中的转换,会引起使用资源的程序产生不必要的序列化。这是因为每个线程都必须等待互斥元,以检查资源是否初始化。

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void foo()
{
    std::unique_lock<std::mutex> lk(rescoure_mutex);

    if (!resource_ptr)  resource_ptr.reset(new some_resource);

    lk.unlock();

    resource_ptr->dosth();
}

  C++STL提供了std::once_flag和std::call_once,与其锁定互斥元并显式地检查指针,不如每个线程都可以调用std::call_once。我们通过下面的例子来看一下,通过std::call_once来实现同上述代码相同的功能。

std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;

void init_resource()
{
    resource_ptr.reset(new some_resource);
}

void foo()
{
    // 调用一次初始化
    std::call_once(resource_flag, init_resource);
    resource_ptr->dosth();
}

  下面我们来看一个例子,演示如何使用std::call_once完成类成员的延迟初始化。

class X
{
private:
    ConnInfo m_CI;
    ConnHandle m_CH;
    std::once_flag m_ConnInit_Flag;

    void openConn()
    {
        // connection
    }

public:
    X(const ConnInfo& a_CI) : m_CI(a_CI) { }

    void Send(const Data& data)
    {
        std::call_once(m_ConnInit_Flag, &X::openConn, this);
        m_CH.send(data);
    }

    data Receive()
    {
        std::call_once(m_ConnInit_Flag, &X::openConn, this);
        return m_CH.receive();
    }
}

  在上面的例子中,初始化在首次调用Send()或Receive()时完成,使用成员函数openConn()来完成初始化,同时最后需要将this传入。值得注意的是,像std::mutex和std::once_flag这种不能复制或移动的实例来说,如果要使用它,就应该显式地定义。

2.2 保护很少更新的数据结构

  除了在初始化后只读的数据,还有些很少更新的数据,它们在大多数情况下是只读的。下面来看一下如何对它们进行保护。假设现在我们有一个DNS缓存表,我们希望确保所有线程在读写时正常运行。我们可以使用std::mutex来完成,但是使用std::mutex的一个缺点是,会消除数据结构没有进行修改时的并发读取数据的可能性。我们可以考虑一种新的互斥元,reader-writer mutex,它考虑了两种不同的用法:由单个“写”线程独占访问或共享;由多个“读”线程并发访问。

  STL并没有实现这样的互斥元,所以我们通过Boost库的boost::shared_mutex来代替std::mutex。boost::shared_mutex的设计很像STL库的smart ptr:当任意一个线程由于共享锁时,其他试图独占锁的线程会被阻塞,知道其他线程撤回它们的锁;当任意一个线程拥有独占锁时,其他线程都不能拥有独占锁或共享锁,直到第一个线程撤销它的独占锁。

  下面我们来以DNS表的并发访问来演示如何使用boost::shared_mutex。

#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>

class DnsEntry;

class DnsCache
{
private:
    std::map<std::string, DnsEntry> m_Entries;

    mutable boost::shared_mutex m_Mutex;

public:
    DnsEntry FindEntry(const std::string& domain) const
    {
        // 读取,共享锁
        boost::shared_lock<boost::shared_mutex> lk(m_Mutex);

        // 查找
        std::map<std::string, DnsEntry>::const_iterator const it = m_Entries.find(domain);

        return it == m_Entries.end() ?
                DnsEntry() :
                it-second();
    }

    void UpdateEntry(const std::string& domain, const DnsEntry& dns)
    {
        // 写入,独占锁
        std::lock_guard<boost::shared_mutex> lk(m_Mutex);

        m_Entries[domain] = dns;
    }
}
退出移动版