C/C++学习:C++并发与多线程
C++11线程
hread/join/detach/joinable
用于判断是否可以调用或者
调用后的线程在后台执行,被C++运行时库接管,结束后由运行时库负责回收
陷阱:主线程结束了,引用或者指针在其他线程就会无效
对于可调用对象,实际上是被拷贝到了其他线程,所以即使主线程的可调用对象析构了,其他线程仍能正常运行
线程传参/成员函数作线程函数
传递简单类型参数是值传递,不是引用传递(即使使用了引用符号)
如果传递类对象,应该避免隐式类型转换,只要用临时构造的对象作为参数传递给线程,就能保证在主线程执行完毕前把线程的参数构造出来
一般情况下不使用,避免局部变量失效带来线程对内存的非法引用问题
std::ref() // 参数作引用传递
// unique_ptr要使用std::move()
#include
#include
using namespace std;
class Print {
public:
void operator()(int value) {
cout << "operator value = " << value << endl;
}
void func(int value) {
cout << "func value = " << value << endl;
}
};
int main() {
Print pt;
thread t1(pt, 314);
t1.join();
thread t2(&Print::func, pt, 314);
t2.join();
return 0;
}
#include
#include
#include
#include
#include
#include
using namespace std;
#define gettid() syscall(__NR_gettid)
void print() {
printf("print::process id: %d\n", getpid());
printf("print::kernel id: %ld\n", gettid());
cout << "print::std thread id: " << std::this_thread::get_id() << endl;
printf("print::pthread id: %lu\n", pthread_self());
}
int main() {
thread t(print);
printf("print::process id: %d\n", getpid());
printf("kernel id: %ld\n", gettid());
cout << "print::std thread id: " << std::this_thread::get_id() << endl;
printf("pthread id: %lu\n", pthread_self());
t.join();
return 0;
}
互斥量/死锁
和必须成对使用,默认阻塞
的原理很简单,构造函数里执行了,析构函数里执行了
保证两个互斥量上锁的顺序一致就不会死锁
可用于同时锁住两个互斥量(一般不用)
是个结构体对象,起一个标记作用,表示这个互斥量之前已经锁住,不需要在构造函数里执行
std::lock(mtx1, mtx2);
std::lock_guard lg1(mtx1, std::adopt_lock);
std::lock_guard lg2(mtx2, std::adopt_lock);
// mutex example
#include // std::cout
#include // std::thread
#include // std::mutex
std::mutex mtx; // mutex for critical section
void print_block (int n, char c) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
for (int i=0; i
unique_lock
比灵活,但效率差
的第二个参数
:含义和相同
:没有锁成功会立即返回
:初始化一个没有加锁的
的成员函数
:体现了的灵活性
:体现了的灵活性
:锁成功返回,锁失败返回
:解绑和,返回指针,如果解绑前已加锁,需要手动解锁
锁的粒度:锁住的代码少,粒度细,执行效率高
所有权的转移,把自己对的所有权转移给其他,不能复制
使用
从函数返回一个局部的对象,返回局部对象会导致系统生成临时对象,并调用移动构造函数
文章福利 Linux后端开发网络底层原理知识学习提升 点击 获取,完善技术栈,内容知识点包括Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK等等。

单例模式/call once
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
保证函数只被调用一次,标记对应的函数是否被执行
condition_variable/wait/notify_one/notify_all
如果第二个参数的lambda表达式返回,则解锁互斥量并阻塞到本行,直到其他线程调用为止;如果第二个参数的lambda表达式返回,则直接返回;如果没有第二个参数,相当于解锁互斥量并阻塞到本行,直到其他线程调用为止
其他线程调用将唤醒后,不断尝试获取互斥锁,如果获取到了就继续执行
如果某个线程正在运行而不是阻塞在等待唤醒,那个另外一个线程的会没有效果
async/future/packaged_task/promise
用来启动一个异步任务,返回一个对象
// async example
#include // std::cout
#include // std::async, std::future
// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
std::cout << "Calculating. Please, wait...\n";
for (int i=2; i fut = std::async (is_prime,313222313);
std::cout << "Checking whether 313222313 is prime.\n";
// ...
bool ret = fut.get(); // waits for is_prime to return
if (ret) std::cout << "It is prime!\n";
else std::cout << "It is not prime.\n";
return 0;
}
提供了一种访问异步操作结果的机制,对象包含了线程入口函数的返回结果,可调用来获取结果
阻塞直到获得结果,只能调用一次;等待线程返回,不返回结果
template future::type> async (Fn&& fn, Args&&... args);
template future::type> async (launch policy, Fn&& fn, Args&&... args);
:表示线程入口函数调用被延迟到或者调用时才执行
如果或者没被调用,则线程根本没有创建;延迟调用本质上没有创建新线程,由主线程调用线程入口函数
:创建新线程并立即执行
:默认标记,由系统决定采用哪种运行方式
的模板参数是各种可调用对象,通过包装起来,作为线程入口函数
通过获取对象
对象也可以直接调用
// packaged_task example
#include // std::cout
#include // std::packaged_task, std::future
#include // std::chrono::seconds
#include // std::thread, std::this_thread::sleep_for
// count down taking a second for each value:
int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from-to;
}
int main () {
std::packaged_task tsk (countdown); // set up packaged_task
std::future ret = tsk.get_future(); // get future
std::thread th (std::move(tsk),10,0); // spawn thread to count down from 10 to 0
// ...
int value = ret.get(); // wait for the task to finish and get result
std::cout << "The countdown lasted for " << value << " seconds.\n";
th.join();
return 0;
}
能够在某个线程赋值,然后在其他线程中把这个值取出来
绑定一个值,在将来某个时刻通过绑定到这个上来得到这个值
// promise example
#include // std::cout
#include // std::ref
#include // std::thread
#include // std::promise, std::future
void print_int (std::future& fut) {
int x = fut.get();
std::cout << "value: " << x << '\n';
}
int main ()
{
std::promise prom; // create promise
std::future fut = prom.get_future(); // engagement with future
std::thread th1 (print_int, std::ref(fut)); // send future to new thread
prom.set_value (10); // fulfill promise
// (synchronizes with getting the future)
th1.join();
return 0;
}
future扩展/atomic
返回
:表示线程还没执行完
:表示线程成功返回
:用于第一个参数设置为
设计是一个移动语义,所以只能使用一次,是拷贝语义,可以使用多次
原子操作一般针对变量而不是代码段
一般原子操作针对,,,是支持的,其他的可能不支持
#include
#include
using namespace std;
int cnt = 0;
void func() {
for(int i = 0; i < 10000; ++i) {
__sync_fetch_and_add(&cnt, 1);
}
}
int main() {
thread t1(func);
thread t2(func);
t1.join();
t2.join();
cout << cnt << endl;
return 0;
}
#include
#include
#include
using namespace std;
atomic cnt;
void func() {
for(int i = 0; i < 10000; ++i) {
cnt++;
}
}
int main() {
thread t1(func);
thread t2(func);
t1.join();
t2.join();
cout << cnt << endl;
return 0;
}
atomic拓展/async拓展
// C++11提供了6种内存模型
enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
}
和区别
并不一定创建新线程
不容易拿到线程返回值
recursive_mutex/timed_mutex
:递归的独占互斥锁
:带超时功能的互斥锁
补充
虚假唤醒:在lambda表达式中正确处理
以原子方式读取对象的值;以原子方式写入对象的值
POSIX线程
线程概念
LWP(Light Weight Process)轻量级进程
进程:拥有PCB,独立地址空间
线程:拥有PCB,共享地址空间
进程与线程的区别:是否共享地址空间
线程:最小调度单位
进程:最小分配资源单位
线程实现原理
从内核里看进程和线程是一样的,都有各自不同的PCB,但是PCB中指向内存资源的三级页表是相同的
# 查看指定线程的LWP号,跟线程id不一样
ps -Lf PID
线程共享/非共享资源
共享资源
文件描述符表
信号处理方式
当前工作目录
用户ID和组ID
内存地址空间(不共享栈)
非共享资源
线程id
处理器现场和内核栈指针
独立的用户空间栈
errno变量
信号屏蔽字
调度优先级
线程优缺点
提高程序并发性
开销小
数据通信,共享数据方便
调试,编写困难
对信号支持不友好
API
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
// 参数1:传出参数,保存分配好的线程ID
// 参数2:通常传NULL,表示使用线程默认属性
// 参数3:函数指针
// 参数4:线程主函数执行期间所使用的参数
// 返回值:成功返回0,失败返回错误号
void pthread_exit(void *retval);
// retval表示线程退出状态,通常传NULL
pthread_t pthread_self(void);
// 获取线程ID:pthread_t类型,在linux下为无符号整数(%lu)
// 线程ID是进程内部识别标志(和LWP号不一样),两个进程间线程ID允许相同
int pthread_join(pthread_t thread, void **retval);
// 线程回收,retval接受线程退出状态
int pthread_detach(pthread_t thread);
// 线程分离,线程与主控线程断开关系
// 分离的线程不能用pthread_join()回收
int pthread_cancel(pthread_t thread);
// 取消线程,线程的取消不是实时的,而有一定的延时,需要等待线程到达某个取消点
// 取消点:检查线程是否被取消,通常是一些系统调用,例如open、read、write等
// 执行man 7 pthreads可以查看具备这些取消点的系统调用列表
线程属性设置分离
typedef struct {
int etachstate; // 线程的分离状态
int schedpolicy; // 线程调度策略
struct sched_param schedparam; // 线程的调度参数
int inheritsched; // 线程的继承性
int scope; // 线程的作用域
size_t guardsize; // 线程栈末尾的警戒缓冲区大小
int stackaddr_set; // 线程的栈设置
void* stackaddr; // 线程栈的位置
size_t stacksize; // 线程栈的大小
} pthread_attr_t;
// ulimit -a查看用户栈的大小
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
// PTHREAD_CREATE_DETACHED,分离状态
// PTHREAD_CREATE_JOINABLE,非分离状态
// 如果设置一个线程为分离线程,而这个线程运行又非常快,它可能在pthread_create函数返回之前就终止了
// 它终止以后就可能将线程号和系统资源移交给其他线程使用,这样调用pthread_create的线程就得到了错误的线程号
int pthread_attr_setstack(pthread_attr_t *attr, void *stackaddr, size_t stacksize);
int pthread_attr_getstack(const pthread_attr_t *attr, void **stackaddr, size_t *stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstacksize(const pthread_attr_t attr, size_t stacksize);
NPTL
从Linux2.6开始,glibc采用了新的线程库NPTL(Native POSIX Thread Library)
线程库由两部分组成:内核的线程支持和用户的线程支持(glibc)
Linux早期内核不支持线程时,glibc就在库中(用户态)以纤程(用户态线程)的方式支持多线程了
POSIX thread只要求了用户编程的调用接口,对内核接口没有要求。Linux下线程的实现就是在内核支持的基础上以POSIX thread的方式对外封装了接口,所以才会有两个ID
查看当前线程库的版本
getconf GNU_LIBPTHREAD_VERSION
线程使用注意事项
主线程退出其他线程不退出,主线程退出应调用
避免僵尸线程
或者在里指定分离属性
避免在多线程引入信号机制
线程同步
协同步调,按预定的先后次序运行
多个控制流共同操作一个共享资源的情况,都需要同步
数据混乱原因
资源共享(独享资源则不会)
调度随机(意味着数据访问会出现竞争)
线程间缺乏必要的同步机制
互斥量
pthread_mutex_t mutex;
// 变量mutex只有两种取值1、0,mutex锁init成功时为1。lock将mutex--,unlock将mutex++
int pthread_mutex_init(pthread_mutex_t * restrict mutex, const pthread_mutexattr_t * restrict attr);
// 参数1:传出参数,调用时应传&mutex
// 参数2:传入参数,互斥量属性,通常传NULL
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
// lock尝试加锁,如果加锁不成功,线程阻塞,阻塞到持有该互斥量的其他线程解锁为止
int pthread_mutex_trylock(pthread_mutex_t *mutex);
// trylock加锁失败直接返回错误号,不阻塞
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// unlock主动解锁,同时将阻塞在该锁上的所有线程全部唤醒,不确定哪个线程先被唤醒
在访问共享资源前加锁,访问结束后立即解锁。锁的“粒度”应越小越好
静态初始化:如果mutex定义在全局或者是静态分配的,直接使用宏进行初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
动态初始化:局部变量使用进行初始化
死锁产生
线程试图对同一个互斥量加锁两次
线程1拥有A锁,请求获得B锁;线程2拥有B锁,请求获得A锁(trylock解决)
读写锁
写独占,读共享,写锁优先级高
写模式加锁时,解锁前,所有对该锁加锁的线程都会被阻塞
读模式加锁时,如果线程以读模式对其加锁会成功,如果线程以写模式加锁会阻塞
读模式加锁时,既有尝试以写模式加锁的线程,也有尝试以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求,优先满足写模式锁
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
条件变量
通常和互斥锁配合使用
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
// 释放已经掌握的互斥锁,相当于pthread_mutex_unlock(&mutex)
// 阻塞等待条件变量cond
// 前两步是一个原子操作
// 当被唤醒返回时,解除阻塞并重新申请获取互斥锁pthread_mutex_lock(&mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
// abstime表示绝对时间
互斥量+条件变量实现生产者消费者模型
信号量
进化版的互斥锁
sem_t sem;
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared取0用于线程间,取1用于进程间
// 信号量的初值决定了占用信号量的线程的个数
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
// 信号量大于0则减1,等于0则造成进程阻塞
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem);
// 信号量加1,同时唤醒阻塞在信号量上的线程
信号量实现生产者消费者模型
进程间同步
进程间也能使用互斥量来达到同步的目的,在使用初始化时修改其属性为进程间共享
pthread_mutexattr_t mattr;
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);
// PTHREAD_PROCESS_PRIVATE 线程锁
// PTHREAD_PROCESS_SHARED 进程锁
文件锁
借助fcntl实现文件锁
哲学家就餐问题
__thread(c++11 thread_local)
__thread是GCC内置的线程局部存储设施(thread local storage),它的实现非常高效
__thread变量是每个线程都有一份独立实体,各个线程的变量值互不干扰
除了这个主要用途,它还可以修饰那些值可能会变,带有全局性,但是又不值得用全局锁保护的变量
只能用于修饰POD类型(Plain old data structure),不能修饰class类型
__thread可以用于修饰全局变量、函数内的静态变量,但是不能用于修饰函数的局部变量或者class的普通成员变量
__thread变量的初始化只能用编译期常量