Linux多线程

Linux多线程

线程概述

与进程( process )类似,线程 thread )是允许应用程序并发执行多个任务的一种机制。一个进程可以包含多个线程。同一个程序中的所有线程均会独立执行相同程序,且共享同一份全局内存区域,其中包括初始化数据段、未初始化数据段,以及堆内存段。(传统意义上的 UNIX 进程只是多线程程序的一个特例,该进程只包含一个线程)

进程是 CPU 分配资源的最小单位,线程是操作系统调度执行的最小单位

1. 进程与线程的区别

  • 进程间的信息难以共享。由于除去只读代码段外,父子进程并未共享内存,因此必须采用一些进程间通信方式,在进程间进行信息交换。
  • 调用 fork() 来创建进程的代价相对较高,即便利用写时复制技术,仍然需要复制诸如内存页表和文件描述符表之类的多种进程属性,这意味着 fork() 调用在时间上的开销依然不菲。
  • 线程之间能够方便、快速地共享信息。只需将数据复制到共享(全局或堆)变量中即可。
  • 创建线程比创建进程通常要快 10 倍甚至更多。线程间是共享虚拟地址空间的,无需采用写时复制来复制内存,也无需复制页表。

2. 线程之间的共享和非共享资源

共享资源:进程ID和父进程ID、进程组ID和会话ID、用户ID和用户组ID、文件描述符表、信号处置、文件系统的相关信息、虚拟地址空间

非共享资源:线程ID、信号掩码、线程特有数据,error变量、实时调度策略和优先级…

线程创建

1. 函数头文件&声明

1
2
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

一般情况下,main函数所在的线程我们称之为主线程,其余创建的线程默认为子线程

2. 参数说明

  • thread:传出参数,线程创建成功后,子线程的ID被写到该变量
  • attr:设置线程的属性,一般使用默认值,NULL
  • start_routine: 函数指针,该函数是子线程需要处理的逻辑代码。

  • args:给第三个参数使用,传参

  • 返回值

    成功:返回0,失败

3. 程序实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void *callback(void *arg){
printf("child thread...\n");
printf("arg val: %d\n", *(int *)arg);
return NULL;
}
int main(){
pthread_t tid;
int num = 10;
int ret = pthread_create(&tid, NULL, callback, (void *)&num);

if(ret){
perror("thread create");
exit(0);
}
for(int i = 0; i < 10; i++){
printf("%d\n", i);
}
sleep(2);
return 0;
}

线程终止

1. 函数头文件&声明

1
2
#include <pthread.h>
void pthread_exit(void *retval);

在哪个线程中调用就终止哪个线程。

当主线程退出时,不会影响其他正常运行的线程。

2. 参数说明

  • retval: 传入一个指针作为返回值

3. 程序实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void *callback(void *arg){
printf("children thread_id %ld\n", pthread_self());
return NULL;
}

int main(){
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if(ret){
perror("thread create");
exit(0);
}
for(int i = 0; i < 10; i++){
printf("%d\n", i);
}
printf("main: %ld %ld\n", tid, pthread_self());
pthread_exit(NULL);
return 0;
}
/*
0
1
2
3
4
5
6
7
8
9
main: 140569674712832 140569674778432
children thread_id 140569674712832
*/

连接已终止的线程

任何的线程都可以去回收其他线程,一般是线程回收子线程,连接的作用是回收子线程的资源。

1. 函数头文件&声明

1
2
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);

2. 参数说明

  • thread:需要回收的子线程的ID
  • retval:接受子线程退出时的返回值

3.代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int val = 0;
void *callback(void *arg){
printf("children thread_id %ld and arg %d\n", pthread_self(), *(int *)arg);
val = 2;
pthread_exit((void *)&val);
}
int main(){
pthread_t tid;
int num = 10;
int ret = pthread_create(&tid, NULL, callback, (void *)&num);
int *child_ret;
pthread_join(tid, (void **)&child_ret);//main线程阻塞
printf("the return value of child %d\n", *child_ret);
if(ret){
perror("thread create");
exit(0);
}
for(int i = 0; i < 10; i++){
printf("%d\n", i);
}
printf("main: %ld %ld\n", tid, pthread_self());
pthread_exit(NULL);
return 0;
}

线程的分离

子线程被分离之后,子线程结束的对应的资源就不需要由主线程释放

1. 函数头文件 & 声明

1
2
#include <pthread.h>
int pthread_detach(pthread_t thread);

2. 参数说明

  • thread: 设置要被分离的线程号

3. 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void *callback(void *arg){
printf("children thread_id %ld\n", pthread_self());
return NULL;
}

int main(){
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if(ret != 0){
char *er = strerror(ret);
printf("%s\n", er);
exit(0);
}
printf("child %ld and self %ld\n", tid, pthread_self());
ret = pthread_detach(tid);
if(ret != 0){
char *er = strerror(ret);
printf("%s\n", er);
exit(0);
}
//分离后连接,会报错
ret = pthread_join(tid, NULL);
if(ret != 0){
char *er = strerror(ret);
printf("%s\n", er);
exit(0);
}
//unreachable
printf("hello\n");
return 0;
}

线程取消

1. 函数头文件 & 声明

1
2
#include <pthread.h>
int pthread_cancel(pthread_t thread);

终止某个线程,但不是立马终止,而是当子线程执行到一个取消点(系统规定好的一些系统调用,可以粗略的认为是从用户区到内核区的切换位置)

2. 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
void *callback(void *arg){
printf("children thread_id %ld\n", pthread_self());
for(int i = 0; i < 100000; i++){
printf("child %d\n", i);
}
return NULL;
}

int main(){
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if(ret != 0){
char *er = strerror(ret);
printf("%s\n", er);
exit(0);
}
for(int i = 0; i < 5; i++){
printf("parent %d\n", i);
}
pthread_cancel(tid);//取消子线程
printf("child %ld and self %ld\n", tid, pthread_self());
return 0;
}

线程同步(key)

  • 必须确保多个线程不能同时修改同一变量,或者某一线程不会读取正在由其他线程修改的变量。

  • 临界区是访问某一共享资源的代码片段,并且这段代码的执行应为原子操作,也就是同时访问统一共享资源的其他线程不应中断该片段的执行

  • 线程同步:当有一个线程对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作,其他线程才能对该内存地址进行操作,而其他线程则处于等待状态。

  • 案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
      #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    int tot = 100;
    void *sell(void *args){
    while(tot > 0){
    usleep(5000);
    printf("thread %ld is selling %d\n tickets", pthread_self(), tot);
    --tot;
    }
    }
    int main(){
    pthread_t tid[3];
    for(int i = 0; i < 3; i++){
    pthread_create(&tid[i], NULL, sell, NULL);
    }
    for(int i = 0; i < 3; i++){
    pthread_join(tid[i], NULL);
    }
    pthread_exit(NULL);
    return 0;
    }
    //ticketsthread 140416499320576 is selling 1
    //ticketsthread 140416482412288 is selling 1 (error)
    //ticketsthread 140416490866432 is selling -1 (error)

    假设线程为A, B, C,由于while内部不是原子操作,因此当tot = 1时, A, B, C可能都抢占到了CPU继续执行从而使得tot变成-1

    ## 互斥锁

    * 为了避免线程更新共享变量时出现问题,可以使用互斥量mutex来确保同时仅有一个线程可以访问某项共享资源,可以使用互斥量来保证对任意共享资源的原子访问。

    * 互斥量有两种状态:已锁定和未锁定,任何时候至多只有一个线程可以锁定该互斥量,试图对已经锁定的某一互斥量再次加锁,有可能阻塞线程

    * 一旦线程锁定互斥量,随即成为该互斥量的所有者,只有所有者才能给互斥量解锁,一般情况下对每一共享资源会使用不同的互斥量,每一线程在访问同一资源时将采用如下协议:

    1)针对共享资源锁定互斥量

    2)访问共享资源

    3)对互斥量解锁

    * API

    ```cpp
    int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr)
    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    int pthread_mutex_lock(pthread_mutex_t *mutex);//阻塞
    int pthread_mutex_trylock(pthread_mutex_t *mutex);//尝试加锁,非阻塞
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
  • 加锁后卖票

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    int tot = 10000;
    pthread_mutex_t lock;//锁变量
    void *sell(void *args){
    while(1){
    pthread_mutex_lock(&lock);//加锁
    if(tot > 0){
    printf("thread %ld is selling %d tickets\n", pthread_self(), tot);
    --tot;
    }else{
    pthread_mutex_unlock(&lock);//也要解锁
    break;
    }
    pthread_mutex_unlock(&lock);//解锁
    }
    }
    int main(){
    pthread_mutex_init(&lock, NULL);//初始化锁
    pthread_t tid[3];
    for(int i = 0; i < 3; i++){
    pthread_create(&tid[i], NULL, sell, NULL);
    }
    for(int i = 0; i < 3; i++){
    pthread_join(tid[i], NULL);
    }
    pthread_exit(NULL);
    return 0;
    }

死锁

  • 两个或两个以上的进程在执行的过程中,因为争夺共享资源而造成的一种互相等待的现象

  • 几种死锁场景

    (1)忘记释放锁

    (2)重复加锁

    (3)多线程多锁,抢占锁资源

  • 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>

    pthread_mutex_t lockA, lockB;

    void *workA(void *args){
    pthread_mutex_lock(&lockA);//进程t[0]先抢占A锁,但是B锁被t[1]抢占
    sleep(1);
    pthread_mutex_lock(&lockB);
    printf("work A...\n");
    pthread_mutex_unlock(&lockB);
    pthread_mutex_unlock(&lockA);
    }

    void *workB(void *args){
    pthread_mutex_lock(&lockB);//进程t[1]先抢占B锁,但是A锁被t[0]抢占
    sleep(1);
    pthread_mutex_lock(&lockA);
    printf("work B...\n");
    pthread_mutex_unlock(&lockA);
    pthread_mutex_unlock(&lockB);
    }

    int main(){
    pthread_mutex_init(&lockA, NULL);
    pthread_mutex_init(&lockB, NULL);
    pthread_t t[2];
    pthread_create(&t[0], NULL, workA, NULL);
    pthread_create(&t[1], NULL, workB, NULL);

    pthread_join(t[0], NULL);
    pthread_join(t[1], NULL);

    pthread_exit(NULL);
    return 0;
    }

读写锁

  • 当前持有互斥锁的线程只是要读访问共享资源,而同时有其他几个线程也想读取这个共享资源,但是由于互斥锁的排他性,所有其他线程都无法获取锁,但是多个线程同时读访问共享资源并不会导致问题。

  • 在对数据的操作中,更多的是读操作,写操作较少,例如对于数据库数据的读写应用,为了满足当前能够允许多个读出但只允许一个写入的需求,线程提供了读写锁。

  • 读写锁的特点

    (1)如果有其他线程读数据,则允许其他线程读,但不允许写

    (2)如果有其他线程写数据,则其他线程都不允许读写

    (3)写是独占的,优先级最高

  • API

    1
    2
    3
    4
    5
    6
    7
    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//加读锁
    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//加写锁
    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
  • 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    //8个线程操作同一个全局变量,3个线程不定时写这个全局变量,5个线程不定时的读这个全局变量
    int num = 1;
    pthread_rwlock_t lock;
    void *writeNum(void *arg){
    while(1){
    pthread_rwlock_wrlock(&lock);
    num++;
    printf("++write, tid: %ld, num: %d\n", pthread_self(), num);
    pthread_rwlock_unlock(&lock);
    usleep(100);
    }
    return NULL;
    }
    void *readNum(void *arg){
    while(1){
    pthread_rwlock_rdlock(&lock);
    printf("==read, tid: %ld, num: %d\n", pthread_self(), num);
    pthread_rwlock_unlock(&lock);
    usleep(100);
    }
    return NULL;
    }
    int main(){
    pthread_rwlock_init(&lock, NULL);
    pthread_t wtids[3];
    pthread_t rtids[5];
    for(int i = 0; i < 3; i++){
    pthread_create(&wtids[i], NULL, writeNum, NULL);
    }
    for(int i = 0; i < 5; i++){
    pthread_create(&rtids[i], NULL, readNum, NULL);
    }
    //设置线程分离
    for(int i = 0; i < 3; i++){
    pthread_detach(wtids[i]);
    }
    for(int i = 0; i < 5; i++){
    pthread_detach(rtids[i]);
    }

    pthread_exit(NULL);
    pthread_rwlock_destroy(&lock);
    return 0;
    }

生产者消费者模型

  • 生产者

  • 消费者

  • 容器

  • 实例(会出错)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    #include <stdio.h>
    #include <pthread.h>
    #include <stdlib.h>
    #include <unistd.h>
    struct Node{
    int val;
    struct Node *next;
    };//链表容器
    struct Node *head = NULL;
    void *produce(void *arg){
    while(1){
    struct Node *temp = (struct Node *)malloc(sizeof(struct Node));
    temp->next = head;
    head = temp;
    temp->val = rand() % 100;
    printf("producer %ld insert %d\n", pthread_self(), temp->val);
    usleep(200);
    }
    return NULL;
    }
    void *consume(void *arg){
    while(1){
    struct Node *temp = head;
    head = head->next;
    printf("customer %ld comsumer %d\n", pthread_self(), temp->val);
    free(temp);
    }
    }

    int main(){
    pthread_t ptid[5];//五个生产者
    pthread_t ctid[5];//五个消费者

    for(int i = 0; i < 5; i++){
    pthread_create(&ptid[i], NULL, produce, NULL);
    pthread_create(&ctid[i], NULL, consume, NULL);
    pthread_join(ptid[i], NULL);
    pthread_join(ctid[i], NULL);
    }

    return 0;
    }

    1. 条件变量

    • API

      1
      2
      3
      4
      5
      6
      int pthread_cond_init (pthread_cond_t *restrict cond,constpthread_condattr_t *rerict attr);
      int pthread_cond_destroy(pthread_cond_t *cond);
      int pthread_cond_wait (pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
      int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrictabstime) ;
      int pthread_cond_signal(pthread_cond_t *cond) ;int pthread_cond_broadcast(pthread_cond_t *cond) ;
      int pthread_cond_broadcast(pthread_cond_t *cond) ;
    • 实例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      #include <stdio.h>
      #include <pthread.h>
      #include <stdlib.h>
      #include <unistd.h>

      pthread_mutex_t lock;
      pthread_cond_t cond;
      struct Node{
      int val;
      struct Node *next;
      };//链表容器
      struct Node *head = NULL;
      void *produce(void *arg){
      while(1){
      pthread_mutex_lock(&lock);
      struct Node *temp = (struct Node *)malloc(sizeof(struct Node));
      temp->next = head;
      head = temp;
      temp->val = rand() % 100;
      printf("producer %ld insert %d\n", pthread_self(), temp->val);
      //只要生产了,就通知消费者
      pthread_cond_signal(&cond);

      pthread_mutex_unlock(&lock);
      usleep(2000);
      }
      return NULL;
      }
      void *consume(void *arg){
      while(1){
      pthread_mutex_lock(&lock);
      struct Node *temp = head;
      if(head != NULL){
      head = head->next;
      printf("customer %ld comsumer %d\n", pthread_self(), temp->val);
      free(temp);
      pthread_mutex_unlock(&lock);
      usleep(2000);
      }else{
      //需要等待
      pthread_cond_wait(&cond, &lock);
      pthread_mutex_unlock(&lock);
      usleep(2000);
      }
      }
      }

      int main(){
      pthread_mutex_init(&lock, NULL);
      pthread_cond_init(&cond, NULL);
      pthread_t ptid[5];//五个生产者
      pthread_t ctid[5];//五个消费者

      for(int i = 0; i < 5; i++){
      pthread_create(&ptid[i], NULL, produce, NULL);
      pthread_create(&ctid[i], NULL, consume, NULL);
      pthread_join(ptid[i], NULL);
      pthread_join(ctid[i], NULL);
      }

      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&cond);
      return 0;
      }

2. 信号量

  • 信号量维护了一个值,表示有多少个线程可以访问该资源,需要配合互斥锁mutex来使用。
  • 多进程和多线程之间都可以使用
  • P, V操作

  • API

    1
    2
    3
    4
    5
    6
    7
    8
    信号量的类型 sem_t
    int sem_init(sem _t *sem, int pshared, unsigned int value);//pshared: 0用在线程,非0用在进程
    int sem_destroy (sem_t *sem);//释放资源
    int sem_wait (sem_t *sem);//对信号了加锁,对信号量的值-1,阻塞
    int sem_trywait (sem_t *sem);//对信号量的值-1, 非阻塞
    int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);//设置阻塞时间
    int sem_post(sem_t *sem);//对信号量解锁,信号量+1
    int sem_getvalue (sem_t *sem,int *sval);//获取信号量的值
  • 实例(生产者消费者模型)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    #include <stdio.h>
    #include <pthread.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <semaphore.h>

    pthread_mutex_t lock;
    sem_t psem;//生产者信号量
    sem_t csem;//消费者信号量
    struct Node{
    int val;
    struct Node *next;
    };//链表容器
    struct Node *head = NULL;
    void *produce(void *arg){
    while(1){
    sem_wait(&psem);
    pthread_mutex_lock(&lock);

    struct Node *temp = (struct Node *)malloc(sizeof(struct Node));
    temp->next = head;
    head = temp;
    temp->val = rand() % 100;
    printf("producer %ld insert %d\n", pthread_self(), temp->val);

    pthread_mutex_unlock(&lock);
    sem_post(&csem);
    usleep(2000);
    }
    return NULL;
    }
    void *consume(void *arg){
    while(1){
    sem_wait(&csem);
    pthread_mutex_lock(&lock);

    struct Node *temp = head;
    head = head->next;
    printf("customer %ld comsumer %d\n", pthread_self(), temp->val);
    free(temp);

    pthread_mutex_unlock(&lock);
    sem_post(&psem);
    usleep(2000);

    }
    }

    int main(){
    sem_init(&psem, 0, 8);
    sem_init(&csem, 0, 0);
    pthread_mutex_init(&lock, NULL);
    pthread_t ptid[5];//五个生产者
    pthread_t ctid[5];//五个消费者

    for(int i = 0; i < 5; i++){
    pthread_create(&ptid[i], NULL, produce, NULL);
    pthread_create(&ctid[i], NULL, consume, NULL);
    pthread_join(ptid[i], NULL);
    pthread_join(ctid[i], NULL);
    }

    pthread_mutex_destroy(&lock);
    return 0;
    }