-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththread_pool.cpp
More file actions
149 lines (126 loc) · 3.91 KB
/
thread_pool.cpp
File metadata and controls
149 lines (126 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include "thread_pool.h"
static tpool_t *tpool = NULL;
/* 工作者线程函数, 从任务链表中取出任务并执行 */
static void* thread_routine(void *arg)
{
tpool_work_t *work;
while(1) {
pthread_mutex_lock(&tpool->queue_lock);
/* 如果线程池没有被销毁且没有任务要执行,则等待 */
while(!tpool->queue_head && !tpool->shutdown) {
pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock);
}
if (tpool->shutdown) {
pthread_mutex_unlock(&tpool->queue_lock);
pthread_exit(NULL);
}
work = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
pthread_mutex_unlock(&tpool->queue_lock);
work->routine(work->arg);
free(work);
}
return NULL;
}
/*
* 创建线程池
*/
int tpool_create(int max_thr_num)
{
int i;
tpool = (tpool_t*)calloc(1, sizeof(tpool_t));
if (!tpool) {
printf("%s: calloc failed\n", __FUNCTION__);
exit(1);
}
/* 初始化 */
tpool->max_thr_num = max_thr_num;
tpool->shutdown = 0;
tpool->queue_head = NULL;
if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) {
printf("%s: pthread_mutex_init failed, errno:%d, error:%s\n",
__FUNCTION__, errno, strerror(errno));
exit(1);
}
if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) {
printf("%s: pthread_cond_init failed, errno:%d, error:%s\n",
__FUNCTION__, errno, strerror(errno));
exit(1);
}
/* 创建工作者线程 */
tpool->thr_id = (pthread_t*)calloc(max_thr_num, sizeof(pthread_t));
if (!tpool->thr_id) {
printf("%s: calloc failed\n", __FUNCTION__);
exit(1);
}
for (i = 0; i < max_thr_num; ++i) {
if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){
printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__,
errno, strerror(errno));
exit(1);
}
}
return 0;
}
/* 销毁线程池 */
void tpool_destroy()
{
int i;
tpool_work_t *member;
if (tpool->shutdown) {
return;
}
tpool->shutdown = 1;
/* 通知所有正在等待的线程 */
pthread_mutex_lock(&tpool->queue_lock);
pthread_cond_broadcast(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);
for (i = 0; i < tpool->max_thr_num; ++i) {
pthread_join(tpool->thr_id[i], NULL);
}
free(tpool->thr_id);
while(tpool->queue_head) {
member = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
free(member);
}
pthread_mutex_destroy(&tpool->queue_lock);
pthread_cond_destroy(&tpool->queue_ready);
free(tpool);
}
/* 向线程池添加任务 */
int tpool_add_work(void*(*routine)(void*), void *arg)
{
tpool_work_t *work, *member;
if (!routine){
printf("%s:Invalid argument\n", __FUNCTION__);
return -1;
}
work = (tpool_work_t*)malloc(sizeof(tpool_work_t));
if (!work) {
printf("%s:malloc failed\n", __FUNCTION__);
return -1;
}
work->routine = routine;
work->arg = arg;
work->next = NULL;
pthread_mutex_lock(&tpool->queue_lock);
member = tpool->queue_head;
if (!member) { //插入第一个节点
tpool->queue_head = work;
} else {
while(member->next) {
member = member->next;
}
member->next = work; //尾插
}
/* 通知工作者线程,有新任务添加 */
pthread_cond_signal(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);
return 0;
}