博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
C实现线程池
阅读量:7016 次
发布时间:2019-06-28

本文共 13806 字,大约阅读时间需要 46 分钟。

第一部分为头文件

1 #ifndef __THREADPOOL_H_ 2 #define __THREADPOOL_H_ 3  4 typedef struct threadpool_t threadpool_t; 5  6 /** 7  * @function threadpool_create 8  * @descCreates a threadpool_t object. 9  * @param thr_num  thread num10  * @param max_thr_num  max thread size11  * @param queue_max_size   size of the queue.12  * @return a newly created thread pool or NULL13  */14 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);15 16 /**17  * @function threadpool_add18  * @desc add a new task in the queue of a thread pool19  * @param pool     Thread pool to which add the task.20  * @param function Pointer to the function that will perform the task.21  * @param argument Argument to be passed to the function.22  * @return 0 if all goes well,else -123  */24 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);25 26 /**27  * @function threadpool_destroy28  * @desc Stops and destroys a thread pool.29  * @param pool  Thread pool to destroy.30  * @return 0 if destory success else -131  */32 int threadpool_destroy(threadpool_t *pool);33 34 /**35  * @desc get the thread num36  * @pool pool threadpool37  * @return # of the thread38  */39 int threadpool_all_threadnum(threadpool_t *pool);40 41 /**42  * desc get the busy thread num43  * @param pool threadpool44  * return # of the busy thread45  */46 int threadpool_busy_threadnum(threadpool_t *pool);47 48 #endif

第二部分为自实现线程池代码(对libevent库进行一些精简,凸显逻辑)

1 #include 
2 #include
3 #include
4 #include
5 #include
6 #include
7 #include
8 #include
9 #include "threadpool.h" 10 11 #define DEFAULT_TIME 10 /*10s检测一次*/ 12 #define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 13 #define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/ 14 #define true 1 15 #define false 0 16 17 typedef struct { 18 void *(*function)(void *); /* 函数指针,回调函数 */ 19 void *arg; /* 上面函数的参数 */ 20 } threadpool_task_t; /* 各子线程任务结构体 */ 21 22 /* 描述线程池相关信息 */ 23 struct threadpool_t { 24 pthread_mutex_t lock; /* 用于锁住本结构体 */ 25 pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */ 26 27 pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */ 28 pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */ 29 30 pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */ 31 pthread_t adjust_tid; /* 存管理线程tid */ 32 threadpool_task_t *task_queue; /* 任务队列(数组首地址) */ 33 34 int min_thr_num; /* 线程池最小线程数 */ 35 int max_thr_num; /* 线程池最大线程数 */ 36 int live_thr_num; /* 当前存活线程个数 */ 37 int busy_thr_num; /* 忙状态线程个数 */ 38 int wait_exit_thr_num; /* 要销毁的线程个数 */ 39 40 int queue_front; /* task_queue队头下标 */ 41 int queue_rear; /* task_queue队尾下标 */ 42 int queue_size; /* task_queue队中实际任务数 */ 43 int queue_max_size; /* task_queue队列可容纳任务数上限 */ 44 45 int shutdown; /* 标志位,线程池使用状态,true或false */ 46 }; 47 48 void *threadpool_thread(void *threadpool); 49 50 void *adjust_thread(void *threadpool); 51 52 int is_thread_alive(pthread_t tid); 53 int threadpool_free(threadpool_t *pool); 54 55 //threadpool_create(3,100,100); 56 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size) 57 { 58 int i; 59 threadpool_t *pool = NULL; 60 do { 61 if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { 62 printf("malloc threadpool fail"); 63 break; /*跳出do while*/ 64 } 65 66 pool->min_thr_num = min_thr_num; 67 pool->max_thr_num = max_thr_num; 68 pool->busy_thr_num = 0; 69 pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */ 70 pool->wait_exit_thr_num = 0; 71 pool->queue_size = 0; /* 有0个产品 */ 72 pool->queue_max_size = queue_max_size; 73 pool->queue_front = 0; 74 pool->queue_rear = 0; 75 pool->shutdown = false; /* 不关闭线程池 */ 76 77 /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */ 78 pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 79 if (pool->threads == NULL) { 80 printf("malloc threads fail"); 81 break; 82 } 83 memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num); 84 85 /* 队列开辟空间 */ 86 pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size); 87 if (pool->task_queue == NULL) { 88 printf("malloc task_queue fail"); 89 break; 90 } 91 92 /* 初始化互斥琐、条件变量 */ 93 if (pthread_mutex_init(&(pool->lock), NULL) != 0 94 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0 95 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 96 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0) 97 { 98 printf("init the lock or cond fail"); 99 break;100 }101 102 /* 启动 min_thr_num 个 work thread */103 for (i = 0; i < min_thr_num; i++) {104 pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/105 printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);106 }107 pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */108 109 return pool;110 111 } while (0);112 113 threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */114 115 return NULL;116 }117 118 /* 向线程池中 添加一个任务 */119 //threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/120 121 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)122 {123 pthread_mutex_lock(&(pool->lock));124 125 /* ==为真,队列已经满, 调wait阻塞 */126 while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {127 pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));128 }129 if (pool->shutdown) {130 pthread_cond_broadcast(&(pool->queue_not_empty));131 pthread_mutex_unlock(&(pool->lock));132 return 0;133 }134 135 /* 清空 工作线程 调用的回调函数 的参数arg */136 if (pool->task_queue[pool->queue_rear].arg != NULL) {137 pool->task_queue[pool->queue_rear].arg = NULL;138 }139 /*添加任务到任务队列里*/140 pool->task_queue[pool->queue_rear].function = function;141 pool->task_queue[pool->queue_rear].arg = arg;142 pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动, 模拟环形 */143 pool->queue_size++;144 145 /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/146 pthread_cond_signal(&(pool->queue_not_empty));147 pthread_mutex_unlock(&(pool->lock));148 149 return 0;150 }151 152 /* 线程池中各个工作线程 */153 void *threadpool_thread(void *threadpool)154 {155 threadpool_t *pool = (threadpool_t *)threadpool;156 threadpool_task_t task;157 158 while (true) {159 /* Lock must be taken to wait on conditional variable */160 /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/161 pthread_mutex_lock(&(pool->lock));162 163 /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/164 while ((pool->queue_size == 0) && (!pool->shutdown)) { 165 printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());166 pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));167 168 /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/169 if (pool->wait_exit_thr_num > 0) {170 pool->wait_exit_thr_num--;171 172 /*如果线程池里线程个数大于最小值时可以结束当前线程*/173 if (pool->live_thr_num > pool->min_thr_num) {174 printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());175 pool->live_thr_num--;176 pthread_mutex_unlock(&(pool->lock));177 178 pthread_exit(NULL);179 }180 }181 }182 183 /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/184 if (pool->shutdown) {185 pthread_mutex_unlock(&(pool->lock));186 printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());187 pthread_detach(pthread_self());188 pthread_exit(NULL); /* 线程自行结束 */189 }190 191 /*从任务队列里获取任务, 是一个出队操作*/192 task.function = pool->task_queue[pool->queue_front].function;193 task.arg = pool->task_queue[pool->queue_front].arg;194 195 pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */196 pool->queue_size--;197 198 /*通知可以有新的任务添加进来*/199 pthread_cond_broadcast(&(pool->queue_not_full));200 201 /*任务取出后,立即将 线程池琐 释放*/202 pthread_mutex_unlock(&(pool->lock));203 204 /*执行任务*/ 205 printf("thread 0x%x start working\n", (unsigned int)pthread_self());206 pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/207 pool->busy_thr_num++; /*忙状态线程数+1*/208 pthread_mutex_unlock(&(pool->thread_counter));209 210 (*(task.function))(task.arg); /*执行回调函数任务*/211 //task.function(task.arg); /*执行回调函数任务*/212 213 /*任务结束处理*/ 214 printf("thread 0x%x end working\n", (unsigned int)pthread_self());215 pthread_mutex_lock(&(pool->thread_counter));216 pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/217 pthread_mutex_unlock(&(pool->thread_counter));218 }219 220 pthread_exit(NULL);221 }222 223 /* 管理线程 */224 void *adjust_thread(void *threadpool)225 {226 int i;227 threadpool_t *pool = (threadpool_t *)threadpool;228 while (!pool->shutdown) {229 230 sleep(DEFAULT_TIME); /*定时 对线程池管理*/231 232 pthread_mutex_lock(&(pool->lock));233 int queue_size = pool->queue_size; /* 关注 任务数 */234 int live_thr_num = pool->live_thr_num; /* 存活 线程数 */235 pthread_mutex_unlock(&(pool->lock));236 237 pthread_mutex_lock(&(pool->thread_counter));238 int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */239 pthread_mutex_unlock(&(pool->thread_counter));240 241 /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/242 if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {243 pthread_mutex_lock(&(pool->lock)); 244 int add = 0;245 246 /*一次增加 DEFAULT_THREAD 个线程*/247 for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY248 && pool->live_thr_num < pool->max_thr_num; i++) {249 if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {250 pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);251 add++;252 pool->live_thr_num++;253 }254 }255 256 pthread_mutex_unlock(&(pool->lock));257 }258 259 /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/260 if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {261 262 /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */263 pthread_mutex_lock(&(pool->lock));264 pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数 设置为10 */265 pthread_mutex_unlock(&(pool->lock));266 267 for (i = 0; i < DEFAULT_THREAD_VARY; i++) {268 /* 通知处在空闲状态的线程, 他们会自行终止*/269 pthread_cond_signal(&(pool->queue_not_empty));270 }271 }272 }273 274 return NULL;275 }276 277 int threadpool_destroy(threadpool_t *pool)278 {279 int i;280 if (pool == NULL) {281 return -1;282 }283 pool->shutdown = true;284 285 /*先销毁管理线程*/286 pthread_join(pool->adjust_tid, NULL);287 288 for (i = 0; i < pool->live_thr_num; i++) {289 /*通知所有的空闲线程*/290 pthread_cond_broadcast(&(pool->queue_not_empty));291 }292 for (i = 0; i < pool->live_thr_num; i++) {293 pthread_join(pool->threads[i], NULL);294 }295 threadpool_free(pool);296 297 return 0;298 }299 300 int threadpool_free(threadpool_t *pool)301 {302 if (pool == NULL) {303 return -1;304 }305 306 if (pool->task_queue) {307 free(pool->task_queue);308 }309 if (pool->threads) {310 free(pool->threads);311 pthread_mutex_lock(&(pool->lock));312 pthread_mutex_destroy(&(pool->lock));313 pthread_mutex_lock(&(pool->thread_counter));314 pthread_mutex_destroy(&(pool->thread_counter));315 pthread_cond_destroy(&(pool->queue_not_empty));316 pthread_cond_destroy(&(pool->queue_not_full));317 }318 free(pool);319 pool = NULL;320 321 return 0;322 }323 324 int threadpool_all_threadnum(threadpool_t *pool)325 {326 int all_threadnum = -1;327 pthread_mutex_lock(&(pool->lock));328 all_threadnum = pool->live_thr_num;329 pthread_mutex_unlock(&(pool->lock));330 return all_threadnum;331 }332 333 int threadpool_busy_threadnum(threadpool_t *pool)334 {335 int busy_threadnum = -1;336 pthread_mutex_lock(&(pool->thread_counter));337 busy_threadnum = pool->busy_thr_num;338 pthread_mutex_unlock(&(pool->thread_counter));339 return busy_threadnum;340 }341 342 int is_thread_alive(pthread_t tid)343 {344 int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活345 if (kill_rc == ESRCH) {346 return false;347 }348 349 return true;350 }351 352 /*测试*/ 353 354 #if 1355 /* 线程池中的线程,模拟处理业务 */356 void *process(void *arg)357 {358 printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);359 sleep(1); //小---大写360 printf("task %d is end\n",(int)arg);361 362 return NULL;363 }364 365 int main(void)366 {367 /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/368 369 threadpool_t *thp = threadpool_create(3,100,100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/370 printf("pool inited");371 372 //int *num = (int *)malloc(sizeof(int)*20);373 int num[20], i;374 for (i = 0; i < 20; i++) {375 num[i]=i;376 printf("add task %d\n",i);377 threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */378 }379 sleep(10); /* 等子线程完成任务 */380 threadpool_destroy(thp);381 382 return 0;383 }384 385 #endif

=======================

线程池的相关信息:

typedef struct {

void *(*function)(void *); /* 函数指针,回调函数 */
void *arg; /* 上面函数的参数 */
} threadpool_task_t; /* 各子线程任务结构体 */

/* 描述线程池相关信息 */
struct threadpool_t {
pthread_mutex_t lock; /* 用于锁住本结构体 */
pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */

pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */

pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */

pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */

pthread_t adjust_tid; /* 存管理线程tid */
threadpool_task_t *task_queue; /* 任务队列(数组首地址) */

int min_thr_num; /* 线程池最小线程数 */

int max_thr_num; /* 线程池最大线程数 */
int live_thr_num; /* 当前存活线程个数 */
int busy_thr_num; /* 忙状态线程个数 */
int wait_exit_thr_num; /* 要销毁的线程个数 */

int queue_front; /* task_queue队头下标 */

int queue_rear; /* task_queue队尾下标 */
int queue_size; /* task_queue队中实际任务数 */
int queue_max_size; /* task_queue队列可容纳任务数上限 */

int shutdown; /* 标志位,线程池使用状态,true或false */

};

============================================

查看这段代码的步骤及代码的相关逻辑步骤

1. 大结构体, threadpool_task_t;结构体

2. main

threadpool_create 创建线程池。

for产出任务

threadpool_add 添加任务。

销毁线程池。

3. threadpool_create()

4. threadpool_thread()

跟踪到pthread_cond_wait();阻塞

5. threadpool_add()

跟踪到pthread_cond_signal(); 会到4步中 pthread_cond_wait()继续向后。

6. adjust_thread()

添加10个线程

移除10个线程 ---pthread_exit();

7. threadpool_destroy()

销毁线程池。 ---pthread_exit();

 

转载于:https://www.cnblogs.com/yyx1-1/p/5907561.html

你可能感兴趣的文章
步步为营 .NET三层架构解析 七、UI的设计(登陆页面、注册页页和添加部门页面)...
查看>>
八种方式实现跨域请求
查看>>
中缀表达式转后缀表达式
查看>>
爬虫第三章 模拟登录
查看>>
POI不同浏览器导出名称处理
查看>>
Mac 终端命令连接mysql
查看>>
ASP.NET MVC 学习1、新增Controller,了解MVC运行机制
查看>>
Tesseract-OCR 字体库下载地址
查看>>
sz一般是结尾带有'\0'的字符串。 string zero
查看>>
如何从Apache官网下载windows版apache服务器
查看>>
node-sqlite3的事务执行方法
查看>>
NPOI之Excel——合并单元格、设置样式、输入公式
查看>>
选取不在另一张表中记录的方法及优化
查看>>
linux 学习笔记之文件与管理
查看>>
body元素对象的clientWidth、offsetWidth、scrollWidth、clientLeft、offsetLeft、scrollLeft
查看>>
监控系统Opserver的配置调试
查看>>
[转]用Excel制作甘特图并管理项目
查看>>
7、Android---网络技术
查看>>
LeetCode: Validata Binary Search Tree
查看>>
在windows系统下安装ubuntu系统
查看>>