并发编程 02 管理线程

发布于 2021-10-04  130 次阅读


  在这一章,我们将先从基础开始学习如何启动线程、等待完成,或是在后台运行它。之后我们将进一步讨论线程函数启动时想起传递额外的参数,以及如何将线程的所有权从一个std::thread转移到另一个。最后我们会看一看选择所使用的线程数量,以及标识特定的线程。

一、基本线程管理

  每个C++程序至少有一个线程,它是由C++在运行时启动的,该线程运行着main()函数。你的程序可以继续启动其他函数作为线程入口。然后,这些线程连同初始线程一起,并发运行。正如程序会在main()返回时结束一样,当指定的函数返回时,该线程就会退出。下面我们来看一下如何启动一个线程。

1.1 启动线程

  我们可以通过一个函数来构造一个std::thread:

void func();
std::thread th(func);

  与许多C++标准库相似,std::thread可以与任何可调用(Callable)类型一起工作,所以我们可以传入一个函数对象的实例进去:

class Task
{
public:
    operator()() const
    {
        func_1();
        func_2();
    }
};

Task task;
std::thread(task);

  在这种情况下,所提供的函数对象被复制(Copied)到新创建的执行线程的存储器中,并在哪里调用。因此重要的是,副本与原本都有等效行为。
  但是我们有一件非常需要注意的事情——所谓的“C++的最棘手的解析”。如果你尝试传递一个匿名变量,那么其语法声明可能与函数声明一样,编译器可能误认为一个函数。下面我们来看看它:

std::thread th(Task());

  上述代码声明了一个函数th(),他接受单个参数(参数类型是指向不接受参数、同时返回Task对象的函数的指针),并返回std::thread对象。它是一个函数,而不是创建一个对象。
  现在我们有两种方法来避免这个问题。

std::thread th( (Task()) );
// 增加一层()将Task()括住
std::thread th{ Task() };
// 使用新的统一初始化语法

  同时我们也可以使用lambda表达式来完成(bind同理):

std::thread th([]
{
    func_1();
    func_2();
});

  一旦开始了线程,你需要显式地决定是要等待它完成(通过结合),还是让他自学运行(通过分离)。如果你在std::thread对象被销毁前未做决定,那么你的程序会被终止(std::thread的析构函数调用std::terminate())。因此,即便在异常存在的情况下,保证线程正确地结合或分离都是我们的当务之急。
  如果我们不等待线程完成,那么则需要保证该线程访问的数据是有效的,直到该线程完成。
  考虑下面一种情况:当线程函数持有局部变量的指针或者引用,且当函数退出的时候线程未完成。

struct Func
{
    int& i;
    Func(int& p_i) : i(p_i) {}
    operator()()
    {
        for (int j = 0; j < 10e3; ++j)
        {
            // do sth
        }
    }
};

void oops()
{
    int local_state = 0;
    Func func(local_state);
    std::thread th(func);
    th.detach();
    // 不等待线程完成而直接退出程序
}

  在上述情况下,如果我们不等待线程th完成操作,在退出oops()之后,func所访问的引用则是一个被销毁的变量。对于指针、引用的问题,我们可以使用两种方法解决:一是使用copy的方式传入变量;二是等待线程完成(结合线程,藉由th.join()实现)。对于第一种方法,在我们推出oops()函数时,线程th中的变量将被销毁。特别地,在一个访问局部变量的函数中创建线程是一个糟糕的主意,除非在退出函数前等待线程完成。

1.2 等待线程完成

  我们可以使用thread.join()来等待线程完成。但是在这种情况下,在独立的线程上运行函数是没有什么意义的,因为第一个线程在此期间(等待另一个线程完成)做不了任何有用的事情。在实际代码中,初始线程要么有自己的工作要去做,要么是在等待所有线程完成之前就要启动多个线程。
  join()很简单——要么等待一个线程完成、要么不等。如果需要对线程就行更细粒度的控制,比如线程检查是否完成,或只是在一段特定的时间内进行等待,那么就必须使用特定机制代替,比如条件变量和future,我们将在第4章提到。调用join()的行为也会清理所有与该线程有关的存储器,这样std::thread对象就不在与已完成的线程相关联,他也不与任何线程相关联。这就意味着,我们只能对一个线程调用一次join(),一旦调用了join(),该线程就不是可连接的,并且joinable()放回false。

1.3 在异常环境下等待

  如上所述,我们需要确保在std::thread对象被销毁前已调用join()或者detach()函数。如果要分离线程,在线程启动后就可以立即调用detach(),所以这不是重点。问题在于,如果我们需要等待线程,就需要仔细选择在哪个地方调用join()。这意味着,如果在线程启动后、join()调用前发生了异常,那么对join()的调用很容易被跳过。
  为了避免这种情况,我们可以使用异常捕获(try/catch),下面演示使用标准的资源获取即初始化(RAII)管用语法,并提供一个类,在它的析构函数中进行join。如下所示:

#include <iostream>
#include <thread>

// 新线程需要完成的操作
void ThreadPrint()
{
    std::cout << "A New Thread!" << std::endl;
}

// 异常处理
class Thread_Guard
{
private:
    std::thread& m_thread;
public:
    // 显式构造、传入引用
    explicit Thread_Guard(std::thread& p_thread):
        m_thread(p_thread) 
    {}

    // 析构、同步
    ~Thread_Guard()
    {
        if (m_thread.joinable())    m_thread.join();
    }

    // 禁止拷贝与赋值
    Thread_Guard(Thread_Guard const&) = delete;
    Thread_Guard& operator=(Thread_Guard const&) = delete;
};

// main()中调用client()
void client()
{
    // 新线程
    std::thread th(ThreadPrint);
    Thread_Guard thG(th);

    std::cout << "Client" << std::endl;
}

int main()
{
    client();
    return 0;
}

  在当前线程(main()所在的初始线程)运行到函数client()末尾时,局部对象thG会按照构造函数的逆序被析构,因此Thread_Guard对象thG首先被销毁,并且析构函数中的线程被结合。
  如果无需等待线程完成,可以通过分离(Detaching)来避免这种异常引起的安全问题。这打破了线程与std::thread对象的联系,并确保党std::thread对象被销毁时,std::terminate()不会被调用,即线程仍然在后台运行。

1.4 在后台运行线程

  在std::thread对象上对用detach()会把线程丢在后台运行,也没有直接的方法与之通信。也不可能等待该线程完成;如果一个线程成为分离的,获取一个引用它的std::thread对象也是不可能的,所以它也不可能再被结合。分离的线程在后台运行,其所有权和控制器交由C++的运行时库,以确保与线程相关的资源在线程退出后能被正确回收。

二、传递参数给线程函数

void func(int param);
std::thread th(func, 5);

  如上所示,我们可以通过以额外参数的方式将函数参数传给std::thread对象。值得注意的是,它默认以拷贝(Copied)方式传入。这意味着,我们在传入指针时需要注意类型转换;传入引用时需要使用std::ref。

2.1 避免隐式类型转换

  现在我们有一个C-style的string,我们期望调用它,但是我们不应该直接传入它的指针,而应该传入一个std::string对象:

char buffer[1024];
// do sth
std::thread th(func, std::string(buffer));
// instead of th(func, buffer);

2.2 传入引用时使用std::ref

  如果我们的函数明确接受一个引用参数,那我们在传入的时候最好使用std::ref。

2.2 thread的构造方式

  std::thread的构造方式和std::bind类似,具体可以参考这篇文章

三、转移线程所有权

  考虑下面两种情况:

  1. 编写一个函数,它创建一个在后台运行的线程,但是向调用函数传回新线程的所有权,而非等待其完成。
  2. 创建一个线程,并将所有权交给等待它完成的函数。

3.1 在thread对象之间转移

  与c++中的其他movable类型(如,std::ifstream, std::unique_ptr)一样,一个特定执行线程的权限可以在std::thread实例之间移动。下面一个例子展示了创建两个线程,以及在三个std::thread对象之间转移所有权:

void func_1();
void func_2();

std::thread th1(&func_1);
// 创建一个新线程,该线程拥有执行func_1的权限,同时该线程被th1持有。
std::thread th2 = std::move(th1);
// 移交th1持有线程的权限给th2,现在th2对象持有的线程负责执行func_1,th1失去原有线程的所有权。

/*
    th1: nullptr
    th2: &func_1
*/

th1 = std::thread(&func_2);
// 启动一个新的线程,并将它交给一个匿名thread对象,再将它交给th1,
// 这里不需要使用move语义,因为从临时对象中的move是自动且隐式的。
std::thread th3;
// 默认构造th3,它不予任何线程关联。
th3 = std::move(th2);
// 现在将th2的线程移交给th3,th3持有与func_1相关的线程,th2不与任何线程相关。

/*
    th1: &func_2
    th2: nullptr
    th3: &func_1
*/

th1 = std::move(th3);
// 将th3持有的线程交给th1,但是注意,这里th1已经持有了一个线程,
// 这将导致th1调用std::terminate()来终止程序,这样做是为了与std::thread的析构函数保持一致,
// 你必须在析构前等待线程完成或分离,同样:
// 在赋值时,不能通过向一个管理线程的thread对象赋一个新值来“舍弃”一个线程。

  下面来看上面例子的完整版:

#include <iostream>
#include <thread>

void func_1()
{
    std::cout << "1" << std::endl;
}

void func_2()
{
    std::cout << "2" << std::endl;
}

int main()
{
    std::cout << "begin" << std::endl;

    std::thread th1(&func_1);
    std::thread th2 = std::move(th1);

    th1 = std::thread(&func_2);
    std::thread th3;
    th3 = std::move(th2);

    th1 = std::move(th3);

    std::cout << "end" << std::endl;

    return 0;
}

  以下是输出结果:

begin
1
2
terminate called without an active exception

3.2 在函数之间转移

  函数间的线程转移与std::bind相同,这里不再赘述。但是我们可以通过这一点构造一个类似于thread_guard类,该类实际活得线程的所有权。

#include <iostream>
#include <thread>

class scoped_thread
{
private:
    std::thread m_th;

public:
    // 使用Move语义的显式构造
    explicit scoped_thread(std::thread p_th):
        m_th(std::move(p_th))
    {
        if (!m_th.joinable())   throw std::logic_error("No thread");
    }

    // 析构时join
    ~scoped_thread()
    {
        m_th.join();
    }

    // 禁止拷贝与赋值构造
    scoped_thread(scoped_thread const&) = delete;
    scoped_thread& operator=(scoped_thread const&) = delete;
};

void func(int num)
{
    std:: cout << num << std::endl;
}

int main()
{
    scoped_thread th(std::thread(&func, 5));

    return 0;
}

3.3 生成一批线程并等待完成

  考虑这样一个例子,我们给出10个线程,并打印一组数据:

#include <iostream>
#include <thread>
#include <cmath>
#include <vector>

/* ===== 积分计算函数 ===== */
void func(int id)
{
    std::cout << id << std::endl;
}

/* ===== main ===== */
int main()
{
    std::vector<std::thread> threads;
    const int threads_num = 10;

    // 创建线程
    for (int i = 0; i < threads_num; ++i)
    {
        threads.push_back(std::thread(&func, i));
    }

    double ans{ 0 };

    for (auto i = threads.begin(); i != threads.end(); ++i)
    {
        (*i).join();
    }   

    return 0;
}

  但是值得注意的是,这不是并行的任务,而是需要等待前一个线程完成的多线程顺序任务。

四、选择线程数量

  使用std::thread::hardware_concurrency()可以返回计算机的物理线程数量。

五、获取线程ID

  使用std::thread::id完成,对于std::thread对象可以调用成员函数get_id()实现。