Bootstrap

C/C++学习:C++并发与多线程

C++11线程

hread/join/detach/joinable

  • 用于判断是否可以调用或者

  • 调用后的线程在后台执行,被C++运行时库接管,结束后由运行时库负责回收

  • 陷阱:主线程结束了,引用或者指针在其他线程就会无效

  • 对于可调用对象,实际上是被拷贝到了其他线程,所以即使主线程的可调用对象析构了,其他线程仍能正常运行

线程传参/成员函数作线程函数

  • 传递简单类型参数是值传递,不是引用传递(即使使用了引用符号)

  • 如果传递类对象,应该避免隐式类型转换只要用临时构造的对象作为参数传递给线程,就能保证在主线程执行完毕前把线程的参数构造出来

  • 一般情况下不使用,避免局部变量失效带来线程对内存的非法引用问题

std::ref()   // 参数作引用传递
// unique_ptr要使用std::move()

#include 
#include 

using namespace std;

class Print {
public:
    void operator()(int value) {
        cout << "operator value = " << value << endl;
    }

    void func(int value) {
        cout << "func value = " << value << endl;
    }
};

int main() {
    Print pt;
    thread t1(pt, 314);
    t1.join();
    thread t2(&Print::func, pt, 314);
    t2.join();
    return 0;
}

#include 
#include 
#include 
#include 
#include 
#include   

using namespace std;

#define gettid() syscall(__NR_gettid)
void print()  {
    printf("print::process id: %d\n", getpid());
    printf("print::kernel id: %ld\n", gettid());
    cout << "print::std thread id: " << std::this_thread::get_id() << endl;
    printf("print::pthread id: %lu\n", pthread_self());
}

int main() {
    thread t(print);
    printf("print::process id: %d\n", getpid());
    printf("kernel id: %ld\n", gettid());
    cout << "print::std thread id: " << std::this_thread::get_id() << endl;
    printf("pthread id: %lu\n", pthread_self());
    t.join();
    return 0;
}

互斥量/死锁

  • 和必须成对使用,默认阻塞

  • 的原理很简单,构造函数里执行了,析构函数里执行了

  • 保证两个互斥量上锁的顺序一致就不会死锁

  • 可用于同时锁住两个互斥量(一般不用)

  • 是个结构体对象,起一个标记作用,表示这个互斥量之前已经锁住,不需要在构造函数里执行

std::lock(mtx1, mtx2);
std::lock_guard lg1(mtx1, std::adopt_lock);
std::lock_guard lg2(mtx2, std::adopt_lock);

// mutex example
#include        // std::cout
#include          // std::thread
#include           // std::mutex
std::mutex mtx;           // mutex for critical section
void print_block (int n, char c) {
    // critical section (exclusive access to std::cout signaled by locking mtx):
    mtx.lock();
    for (int i=0; i

unique_lock

比灵活,但效率差

的第二个参数

  • :含义和相同

  • :没有锁成功会立即返回

  • :初始化一个没有加锁的

的成员函数

  • :体现了的灵活性

  • :体现了的灵活性

  • :锁成功返回,锁失败返回

  • :解绑和,返回指针,如果解绑前已加锁,需要手动解锁

锁的粒度:锁住的代码少,粒度细,执行效率高

所有权的转移,把自己对的所有权转移给其他,不能复制

  • 使用

  • 从函数返回一个局部的对象,返回局部对象会导致系统生成临时对象,并调用移动构造函数

文章福利 Linux后端开发网络底层原理知识学习提升 点击  获取,完善技术栈,内容知识点包括Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK等等。

单例模式/call once

template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );

保证函数只被调用一次,标记对应的函数是否被执行

condition_variable/wait/notify_one/notify_all

如果第二个参数的lambda表达式返回,则解锁互斥量并阻塞到本行,直到其他线程调用为止;如果第二个参数的lambda表达式返回,则直接返回;如果没有第二个参数,相当于解锁互斥量并阻塞到本行,直到其他线程调用为止

其他线程调用将唤醒后,不断尝试获取互斥锁,如果获取到了就继续执行

如果某个线程正在运行而不是阻塞在等待唤醒,那个另外一个线程的会没有效果

async/future/packaged_task/promise

用来启动一个异步任务,返回一个对象

// async example
#include        // std::cout
#include          // std::async, std::future
// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
    std::cout << "Calculating. Please, wait...\n";
    for (int i=2; i fut = std::async (is_prime,313222313);
    std::cout << "Checking whether 313222313 is prime.\n";
    // ...
    bool ret = fut.get();      // waits for is_prime to return
    if (ret) std::cout << "It is prime!\n";
    else std::cout << "It is not prime.\n";
    return 0;
}

提供了一种访问异步操作结果的机制,对象包含了线程入口函数的返回结果,可调用来获取结果

阻塞直到获得结果,只能调用一次等待线程返回,不返回结果

template                                                                            future::type> async (Fn&& fn, Args&&... args);

template                                                                           future::type> async (launch policy, Fn&& fn, Args&&... args);
  • :表示线程入口函数调用被延迟到或者调用时才执行

    如果或者没被调用,则线程根本没有创建;延迟调用本质上没有创建新线程,由主线程调用线程入口函数

  • :创建新线程并立即执行

  • :默认标记,由系统决定采用哪种运行方式

的模板参数是各种可调用对象,通过包装起来,作为线程入口函数

通过获取对象

对象也可以直接调用

// packaged_task example
#include      // std::cout
#include        // std::packaged_task, std::future
#include        // std::chrono::seconds
#include        // std::thread, std::this_thread::sleep_for
// count down taking a second for each value:
int countdown (int from, int to) {
	for (int i=from; i!=to; --i) {
        std::cout << i << '\n';
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout << "Lift off!\n";
    return from-to;
}

int main () {
    std::packaged_task tsk (countdown);   // set up packaged_task
    std::future ret = tsk.get_future();            // get future
    std::thread th (std::move(tsk),10,0);   // spawn thread to count down from 10 to 0
    // ...
    int value = ret.get();                  // wait for the task to finish and get result
    std::cout << "The countdown lasted for " << value << " seconds.\n";
    th.join();
    return 0;
}

能够在某个线程赋值,然后在其他线程中把这个值取出来

绑定一个值,在将来某个时刻通过绑定到这个上来得到这个值

// promise example
#include        // std::cout
#include      // std::ref
#include          // std::thread
#include          // std::promise, std::future
void print_int (std::future& fut) {
    int x = fut.get();
    std::cout << "value: " << x << '\n';
}
int main ()
{
    std::promise prom;                      // create promise
    std::future fut = prom.get_future();    // engagement with future
    std::thread th1 (print_int, std::ref(fut));  // send future to new thread
    prom.set_value (10);                         // fulfill promise
    // (synchronizes with getting the future)
    th1.join();
    return 0;
}

future扩展/atomic

返回

  • :表示线程还没执行完

  • :表示线程成功返回

  • :用于第一个参数设置为

设计是一个移动语义,所以只能使用一次,是拷贝语义,可以使用多次

原子操作一般针对变量而不是代码段

一般原子操作针对,,,是支持的,其他的可能不支持

#include  
#include  
using namespace std; 

int cnt = 0; 
void func() { 
    for(int i = 0; i < 10000; ++i) { 
        __sync_fetch_and_add(&cnt, 1);
    } 
} 
int main() { 
    thread t1(func); 
    thread t2(func); 
    t1.join(); 
    t2.join(); 
    cout << cnt << endl; 
    return 0; 
}

#include  
#include 
#include 
using namespace std; 

atomic cnt;
void func() { 
    for(int i = 0; i < 10000; ++i) { 
		cnt++;
    } 
} 
int main() { 
    thread t1(func); 
    thread t2(func); 
    t1.join(); 
    t2.join(); 
    cout << cnt << endl; 
    return 0; 
}

atomic拓展/async拓展

// C++11提供了6种内存模型
enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
}

和区别

  • 并不一定创建新线程

  • 不容易拿到线程返回值

recursive_mutex/timed_mutex

:递归的独占互斥锁

:带超时功能的互斥锁

补充

虚假唤醒:在lambda表达式中正确处理

以原子方式读取对象的值;以原子方式写入对象的值

POSIX线程

线程概念

  • LWP(Light Weight Process)轻量级进程

  • 进程:拥有PCB,独立地址空间

  • 线程:拥有PCB,共享地址空间

  • 进程与线程的区别:是否共享地址空间

  • 线程:最小调度单位

  • 进程:最小分配资源单位

线程实现原理

从内核里看进程和线程是一样的,都有各自不同的PCB,但是PCB中指向内存资源的三级页表是相同的

# 查看指定线程的LWP号,跟线程id不一样
ps -Lf PID

线程共享/非共享资源

  • 共享资源

文件描述符表

信号处理方式

当前工作目录

用户ID和组ID

内存地址空间(不共享栈)

  • 非共享资源

线程id

处理器现场和内核栈指针

独立的用户空间栈

errno变量

信号屏蔽字

调度优先级

线程优缺点

  • 提高程序并发性

  • 开销小

  • 数据通信,共享数据方便

  • 调试,编写困难

  • 对信号支持不友好

API

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
           void *(*start_routine) (void *), void *arg);
// 参数1:传出参数,保存分配好的线程ID
// 参数2:通常传NULL,表示使用线程默认属性
// 参数3:函数指针
// 参数4:线程主函数执行期间所使用的参数
// 返回值:成功返回0,失败返回错误号

void pthread_exit(void *retval);
// retval表示线程退出状态,通常传NULL

pthread_t pthread_self(void);
// 获取线程ID:pthread_t类型,在linux下为无符号整数(%lu)
// 线程ID是进程内部识别标志(和LWP号不一样),两个进程间线程ID允许相同

int pthread_join(pthread_t thread, void **retval);
// 线程回收,retval接受线程退出状态

int pthread_detach(pthread_t thread);
// 线程分离,线程与主控线程断开关系
// 分离的线程不能用pthread_join()回收

int pthread_cancel(pthread_t thread);
// 取消线程,线程的取消不是实时的,而有一定的延时,需要等待线程到达某个取消点
// 取消点:检查线程是否被取消,通常是一些系统调用,例如open、read、write等
// 执行man 7 pthreads可以查看具备这些取消点的系统调用列表

线程属性设置分离

typedef struct {
    int etachstate;                     // 线程的分离状态
    int schedpolicy;                    // 线程调度策略
    struct sched_param schedparam;      // 线程的调度参数
    int inheritsched;                   // 线程的继承性
    int scope;                          // 线程的作用域
    size_t guardsize;                   // 线程栈末尾的警戒缓冲区大小
    int stackaddr_set;                  // 线程的栈设置
    void* stackaddr;                    // 线程栈的位置
    size_t stacksize;                   // 线程栈的大小
} pthread_attr_t;
// ulimit -a查看用户栈的大小

int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);

int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
// PTHREAD_CREATE_DETACHED,分离状态
// PTHREAD_CREATE_JOINABLE,非分离状态
// 如果设置一个线程为分离线程,而这个线程运行又非常快,它可能在pthread_create函数返回之前就终止了
// 它终止以后就可能将线程号和系统资源移交给其他线程使用,这样调用pthread_create的线程就得到了错误的线程号

int pthread_attr_setstack(pthread_attr_t *attr, void *stackaddr, size_t stacksize);
int pthread_attr_getstack(const pthread_attr_t *attr, void **stackaddr, size_t *stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstacksize(const pthread_attr_t attr, size_t stacksize);

NPTL

从Linux2.6开始,glibc采用了新的线程库NPTL(Native POSIX Thread Library)

  • 线程库由两部分组成:内核的线程支持和用户的线程支持(glibc)

  • Linux早期内核不支持线程时,glibc就在库中(用户态)以纤程(用户态线程)的方式支持多线程了

  • POSIX thread只要求了用户编程的调用接口,对内核接口没有要求。Linux下线程的实现就是在内核支持的基础上以POSIX thread的方式对外封装了接口,所以才会有两个ID

查看当前线程库的版本

getconf GNU_LIBPTHREAD_VERSION

线程使用注意事项

  • 主线程退出其他线程不退出,主线程退出应调用

  • 避免僵尸线程

    或者在里指定分离属性

  • 避免在多线程引入信号机制

线程同步

协同步调,按预定的先后次序运行

多个控制流共同操作一个共享资源的情况,都需要同步

数据混乱原因

  • 资源共享(独享资源则不会)

  • 调度随机(意味着数据访问会出现竞争)

  • 线程间缺乏必要的同步机制

互斥量

pthread_mutex_t mutex;
// 变量mutex只有两种取值1、0,mutex锁init成功时为1。lock将mutex--,unlock将mutex++
int pthread_mutex_init(pthread_mutex_t * restrict mutex, const pthread_mutexattr_t * restrict attr);
// 参数1:传出参数,调用时应传&mutex
// 参数2:传入参数,互斥量属性,通常传NULL
int pthread_mutex_destroy(pthread_mutex_t *mutex);

int pthread_mutex_lock(pthread_mutex_t *mutex);
// lock尝试加锁,如果加锁不成功,线程阻塞,阻塞到持有该互斥量的其他线程解锁为止
int pthread_mutex_trylock(pthread_mutex_t *mutex);
// trylock加锁失败直接返回错误号,不阻塞
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// unlock主动解锁,同时将阻塞在该锁上的所有线程全部唤醒,不确定哪个线程先被唤醒

在访问共享资源前加锁,访问结束后立即解锁。锁的“粒度”应越小越好

静态初始化:如果mutex定义在全局或者是静态分配的,直接使用宏进行初始化

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

动态初始化:局部变量使用进行初始化

死锁产生

  • 线程试图对同一个互斥量加锁两次

  • 线程1拥有A锁,请求获得B锁;线程2拥有B锁,请求获得A锁(trylock解决)

读写锁

写独占,读共享,写锁优先级高

  • 写模式加锁时,解锁前,所有对该锁加锁的线程都会被阻塞

  • 读模式加锁时,如果线程以读模式对其加锁会成功,如果线程以写模式加锁会阻塞

  • 读模式加锁时,既有尝试以写模式加锁的线程,也有尝试以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求,优先满足写模式锁

pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

条件变量

通常和互斥锁配合使用

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
// 释放已经掌握的互斥锁,相当于pthread_mutex_unlock(&mutex)
// 阻塞等待条件变量cond
// 前两步是一个原子操作
// 当被唤醒返回时,解除阻塞并重新申请获取互斥锁pthread_mutex_lock(&mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
// abstime表示绝对时间

互斥量+条件变量实现生产者消费者模型

信号量

进化版的互斥锁

sem_t sem;
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared取0用于线程间,取1用于进程间
// 信号量的初值决定了占用信号量的线程的个数
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
// 信号量大于0则减1,等于0则造成进程阻塞
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem);
// 信号量加1,同时唤醒阻塞在信号量上的线程

信号量实现生产者消费者模型

进程间同步

进程间也能使用互斥量来达到同步的目的,在使用初始化时修改其属性为进程间共享

pthread_mutexattr_t mattr;
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);
// PTHREAD_PROCESS_PRIVATE 线程锁
// PTHREAD_PROCESS_SHARED 进程锁

文件锁

借助fcntl实现文件锁

哲学家就餐问题

__thread(c++11 thread_local)

__thread是GCC内置的线程局部存储设施(thread local storage),它的实现非常高效

__thread变量是每个线程都有一份独立实体,各个线程的变量值互不干扰

除了这个主要用途,它还可以修饰那些值可能会变,带有全局性,但是又不值得用全局锁保护的变量

  • 只能用于修饰POD类型(Plain old data structure),不能修饰class类型

  • __thread可以用于修饰全局变量、函数内的静态变量,但是不能用于修饰函数的局部变量或者class的普通成员变量

  • __thread变量的初始化只能用编译期常量