C++线程池实现
- 格式:doc
- 大小:95.00 KB
- 文档页数:17
c 多线程实现的四种方式C语言是一种非常流行的编程语言,它可以用来实现多线程编程。
多线程编程可以让你的程序更高效、更快速地运行,因为它可以同时执行多个任务。
在这篇文章中,我们将介绍 C 多线程实现的四种方式。
1. 使用 pthread 库pthread 是一个 POSIX 标准定义的多线程库,它提供了一套API 接口,可以用来实现多线程编程。
使用 pthread,你可以创建多个线程并且控制它们的行为。
这种方式是 C 语言实现多线程的最常用方式之一。
2. 使用 OpenMP 库OpenMP 是一个开源的多线程库,它可以用来在 C 语言中实现多线程编程。
OpenMP 提供了一套 API 接口,可以让你更方便地编写并行程序。
使用 OpenMP,你可以使用 #pragma 指令来控制并行执行的代码块。
3. 使用 POSIX 线程POSIX 线程是一种 POSIX 标准定义的多线程接口,它可以用来实现多线程编程。
与 pthread 类似,POSIX 线程提供了一套 API 接口,可以让你更方便地编写多线程程序。
4. 使用 Windows 线程如果你在 Windows 操作系统上编写 C 语言程序,你可以使用Windows 线程来实现多线程编程。
Windows 线程提供了一套 API 接口,可以让你在 Windows 平台上创建多个线程并且控制它们的行为。
总结以上是 C 多线程实现的四种方式。
在选择使用哪种方式时,你应该考虑自己的需求和使用的操作系统。
不同的方式会有不同的 API 接口、性能和可移植性。
如果你需要了解更多关于 C 多线程编程的知识,可以参考相关的书籍和教程。
在C++中,线程池(Thread Pool)是一个优化多线程处理任务的常用模式。
通过线程池,你可以管理一组线程,并将任务分配给这些线程执行,从而避免频繁地创建和销毁线程,从而提高程序的性能。
下面是一个简单的C++线程池实现示例:cpp复制代码#include<iostream>#include<vector>#include<queue>#include<thread>#include<mutex>#include<condition_variable>#include<functional>#include<future>class ThreadPool {private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;public:ThreadPool(size_t threads): stop(false) {for (size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {for (;;) {std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty()) {return;}task = std::move(this->tasks.front());this->tasks.pop();}task();}});}template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);if (stop) {throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;}~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers) {worker.join();}}};// 示例任务void task(int n) {std::cout << "Task " << n << " is running on thread " << std::this_thread::get_id() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "Task "<< n << " has finished on thread "<< std::this_thread::get_id() << std::endl;}int main() {const size_t num_threads = 5;ThreadPool pool(num_threads);std::vector<std::future<void>> futures;for (int i = 0; i < 10; ++i) {futures.emplace_back(pool.enqueue(task, i));}for (auto &future : futures) {future.get();}return0;}这个示例中,ThreadPool类管理了一个线程池,你可以通过调用enqueue方法来向线程池中添加任务。
C语言实现线程池https:///p/6afdffe94d96什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。
如果线程创建T1和销毁时间T3相比任务执行时间T2可以忽略不计,则没有必要使用线程池了。
反之如果T1+T3>T2,那就很有必要使用线程池。
•下面是Linux系统下用C语言创建的一个线程池。
线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。
•pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。
该函数中while (pool->cur_queue_size == 0){pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock));}表示如果任务链表中没有任务,则该线程处于阻塞等待状态。
否则从队列中取出任务并执行。
pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个出于阻塞状态的线程(如果有的话)。
pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。
下面贴出完整代码#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <sys/types.h>#include <pthread.h>#include <assert.h>/**线程池里所有运行和等待的任务都是一个CThread_worker*由于所有任务都在链表里,所以是一个链表结构*/typedef struct worker{/*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/void *(*process) (void *arg);void *arg;/*回调函数的参数*/struct worker *next;} CThread_worker;/*线程池结构*/typedef struct{pthread_mutex_t queue_lock;pthread_cond_t queue_ready;/*链表结构,线程池中所有等待任务*/CThread_worker *queue_head;/*是否销毁线程池*/int shutdown;pthread_t *threadid;/*线程池中允许的活动线程数目*/int max_thread_num;/*当前等待队列的任务数目*/int cur_queue_size;} CThread_pool;int pool_add_worker (void *(*process) (void *arg), void *arg); void *thread_routine (void *arg);static CThread_pool *pool = NULL;void pool_init (int max_thread_num){pool = (CThread_pool *) malloc (sizeof (CThread_pool)); pthread_mutex_init (&(pool->queue_lock), NULL); pthread_cond_init (&(pool->queue_ready), NULL);pool->queue_head = NULL;pool->max_thread_num = max_thread_num;pool->cur_queue_size = 0;pool->shutdown = 0;pool->threadid =(pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); int i = 0;for (i = 0; i < max_thread_num; i++){pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL);}}/*向线程池中加入任务*/int pool_add_worker (void *(*process) (void *arg), void *arg) {/*构造一个新任务*/CThread_worker *newworker =(CThread_worker *) malloc (sizeof (CThread_worker));newworker->process = process;newworker->arg = arg;newworker->next = NULL;/*别忘置空*/pthread_mutex_lock (&(pool->queue_lock));/*将任务加入到等待队列中*/CThread_worker *member = pool->queue_head;if (member != NULL){while (member->next != NULL)member = member->next;member->next = newworker;}else{pool->queue_head = newworker;}assert (pool->queue_head != NULL);pool->cur_queue_size++;pthread_mutex_unlock (&(pool->queue_lock));/*好了,等待队列中有任务了,唤醒一个等待线程;注意如果所有线程都在忙碌,这句没有任何作用*/pthread_cond_signal (&(pool->queue_ready));return 0;}/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出*/int pool_destroy (){if (pool->shutdown)return -1;/*防止两次调用*/pool->shutdown = 1;/*唤醒所有等待线程,线程池要销毁了*/pthread_cond_broadcast (&(pool->queue_ready)); /*阻塞等待线程退出,否则就成僵尸了*/int i;for (i = 0; i < pool->max_thread_num; i++) pthread_join (pool->threadid[i], NULL);free (pool->threadid);/*销毁等待队列*/CThread_worker *head = NULL;while (pool->queue_head != NULL){head = pool->queue_head;pool->queue_head = pool->queue_head->next; free (head);}/*条件变量和互斥量也别忘了销毁*/pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready));free (pool);/*销毁后指针置空是个好习惯*/pool=NULL;return 0;}//非常重要的任务接口函数,各子线程统一调用这个函数,而这个函数内部检查调用任务队列中的实际任务函数指针。
c 线程池的例子线程池是一种用于管理线程的机制,可以避免频繁地创建和销毁线程,从而提高应用程序的性能。
下面是一个使用C语言实现线程池的简单示例:```cinclude <>include <>include <>include <>define MAX_THREADS 5define MAX_TASKS 10typedef struct task {int id;int data;} task;typedef struct thread_pool {pthread_t threads[MAX_THREADS];task tasks[MAX_TASKS];int task_count;int thread_count;bool shutdown;} thread_pool;void worker(void arg) {thread_pool pool = (thread_pool )arg;while (1) {pthread_mutex_lock(&pool->mutex);while (pool->task_count == 0 && !pool->shutdown) { pthread_cond_wait(&pool->cond, &pool->mutex); }if (pool->shutdown && pool->task_count == 0) {pthread_mutex_unlock(&pool->mutex);pthread_exit(NULL);}task task = &pool->tasks[pool->task_count - 1];pool->task_count--;pthread_mutex_unlock(&pool->mutex);printf("Thread %ld is processing task %d with data %d\n", pthread_self(), task->id, task->data);free(task);}}void init_thread_pool(thread_pool pool) {pthread_mutex_init(&pool->mutex, NULL);pthread_cond_init(&pool->cond, NULL);pool->task_count = 0;pool->thread_count = 0;pool->shutdown = false;}void destroy_thread_pool(thread_pool pool) {pthread_mutex_destroy(&pool->mutex);pthread_cond_destroy(&pool->cond);}void add_task(thread_pool pool, int id, int data) {pthread_mutex_lock(&pool->mutex);if (pool->task_count < MAX_TASKS) {task task = (task )malloc(sizeof(task));task->id = id;task->data = data;pool->tasks[pool->task_count++] = task;pthread_cond_signal(&pool->cond); // notify any waiting threads that a new task is available} else {printf("Thread pool is full\n");}pthread_mutex_unlock(&pool->mutex);}void start_threads(thread_pool pool) {for (int i = 0; i < MAX_THREADS; i++) {pthread_create(&pool->threads[i], NULL, worker, pool); pool->thread_count++;}}void stop_threads(thread_pool pool) {pthread_mutex_lock(&pool->mutex);pool->shutdown = true; // signal threads to stop if they are waiting for new tasks to become availablepthread_cond_broadcast(&pool->cond); // notify all threads to stop if they are waiting for new tasks to become availablepthread_mutex_unlock(&pool->mutex);for (int i = 0; i < pool->thread_count; i++) {pthread_join(pool->threads[i], NULL); // wait for each thread to complete its current task and exit gracefully before moving on to the next thread to join with it.This ensures that all threads have completed their work before the program exits.It also ensures that the program does not exit before all threads have completed their work.If a thread is waiting on a condition variable,pthread_join will block until the condition variable is signaled, ensuring that all threads have completed their work before the program exits. Note that pthread_join is a blocking call that will not return until the specified thread has completed its execution and exited. It is important to ensure that allthreads have completed their work before the program exits. Otherwise。
线程池c语言
线程池是一种多线程实现方式,用于提高程序的并发性能。
在C
语言中,我们可以通过使用pthread库来实现线程池。
线程池的主要思想是预先创建一定数量的线程,并将任务放入队
列中。
当需要处理任务时,线程池中的线程会自动从队列中获取任务
并执行。
这样可以避免频繁地创建和销毁线程,节省内存和CPU资源。
C语言中实现线程池的步骤如下:
1. 定义任务结构体,用于保存任务的函数指针和参数信息。
2. 定义线程池结构体,包括线程池的大小、任务队列、互斥锁、条件变量等信息。
3. 在初始化函数中创建一定数量的线程,并将其添加到线程池中。
4. 实现任务队列的操作函数,包括向队列中添加任务和获取任务。
5. 实现线程函数,用于从任务队列中获取任务并执行。
6. 实现主函数,创建任务并将其添加到任务队列中,等待线程
池的处理。
在使用线程池的过程中,需要注意线程安全的问题。
我们需要使
用互斥锁来保证多个线程操作共享资源的安全性。
同时,条件变量的
使用可以让线程在某些条件满足时才进行等待和唤醒。
总之,线程池是一种非常实用的并发编程技术,可以大大提高程
序的性能和可靠性。
在C语言中,通过使用pthread库来实现线程池,可以让我们更加便捷地进行多线程编程。
C语⾔实现线程池以前写过⼀篇关于如何使⽤多线程推升推送速度(),能够到达5000qps,其实已经可以满⾜现在的业务,不过在看nginx的说明⽂档时,⼜提到nginx⽀持线程池来提升响应速度,⼀直对如何实现线程池很感兴趣,利⽤周末的时间参考别⼈的代码,⾃⼰写了⼀个初级版,并且调通了,还没在实际开发中应⽤,不知道效果如何代码如下:pd_log.h#ifndef __pd_log_#define __pd_log_#define LOG_DEBUG_PATH "debug.log"#define LOG_ERROR_PATH "error.log"/*** define log level*/enum log_level {DEBUG = 0,ERROR = 1};#define error(...) \logger(ERROR, __LINE__, __VA_ARGS__)#define debug(...) \logger(DEBUG, __LINE__, __VA_ARGS__)#define assert(expr, rc) \if(!(expr)){ \error(#expr"is null or 0"); \return rc; \}#endifpd_log.c#include <stdio.h>#include <stdlib.h>#include <stdarg.h>#include <time.h>#include "pd_log.h"/*** get now timestr*/static void get_time(char *time_str, size_t len) {time_t tt;struct tm local_time;time(&tt);localtime_r(&tt, &local_time);strftime(time_str, len, "%m-%d %H:%M:%S", &local_time);}/*** log*/static void logger(int flag, int line, const char *fmt, ...) {FILE *fp = NULL;char time_str[20 + 1];va_list args;get_time(time_str, sizeof(time_str));switch (flag) {case DEBUG:fp = fopen(LOG_DEBUG_PATH, "a");if (!fp) {return;}fprintf(fp, "%s DEBUG (%d:%d) ", time_str, getpid(), line);break;case ERROR:fp = fopen(LOG_ERROR_PATH, "a");if (!fp) {return;}fprintf(fp, "%s ERROR (%d:%d) ", time_str, getpid(), line);break;default:return;}va_start(args, fmt);vfprintf(fp, fmt, args);va_end(args);fprintf(fp, "\n");fclose(fp);return;}pd_pool.h/*** 线程池头⽂件* @author jimmy* @date 2016-5-14*/#ifndef __PD_POOL_#define __PD_POOL_/*任务链表*/typedef struct task_s{void (*routine)(void *);void *argv;struct task_s *next;} pd_task_t;/*任务队列*/typedef struct queue_s{pd_task_t *head;pd_task_t **tail;size_t max_task_num;size_t cur_task_num;}pd_queue_t;/*线程池*/typedef struct pool_s{pthread_mutex_t mutex;pthread_cond_t cond;pd_queue_t queue;size_t thread_num;//size_t thread_stack_size;}pd_pool_t;/*初始化线程池*///pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_stack_size, size_t thread_max_num); #endifpd_poo.c/*** 线程池* @author jimmy* @date 2016-5-14*/#include <stdio.h>#include <stdlib.h>#include <errno.h>#include <pthread.h>#include "pd_log.h"#include "pd_log.c"#include "pd_pool.h"/*tsd*/pthread_key_t key;void *pd_worker_dispatch(void *argv){ushort exit_flag = 0;pd_task_t *a_task;pd_pool_t *a_pool = (pd_pool_t *)argv;if(pthread_setspecific(key, (void *)&exit_flag) != 0){return NULL;}/*动态从任务列表中获取任务执⾏*/while(!exit_flag){pthread_mutex_lock(&a_pool->mutex);/*如果此时任务链表为空,则需要等待条件变量为真*/while(a_pool->queue.head == NULL){pthread_cond_wait(&a_pool->cond, &a_pool->mutex); }/*从任务链表中任务开⽀执⾏*/a_task = a_pool->queue.head;a_pool->queue.head = a_task->next;a_pool->queue.cur_task_num--;if(a_pool->queue.head == NULL){a_pool->queue.tail = &a_pool->queue.head;}/*解锁*/pthread_mutex_unlock(&a_pool->mutex);/*执⾏任务*/a_task->routine(a_task->argv);//corefree(a_task);a_task = NULL;}pthread_exit(0);}/*** 根据线程数创建所有的线程*/static int pd_pool_create(pd_pool_t *a_pool){int i;pthread_t tid;for(i = 0; i < a_pool->thread_num; i++){pthread_create(&tid, NULL, pd_worker_dispatch, a_pool); }return0;}/*** 线程退出函数*/void pd_pool_exit_cb(void *argv){unsigned int *lock = argv;ushort *exit_flag_ptr = pthread_getspecific(key);*exit_flag_ptr = 1;pthread_setspecific(key, (void *)exit_flag_ptr);*lock = 0;}/*** 线程池初始化*/pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_max_num){ pd_pool_t *a_pool = NULL;a_pool = calloc(1, sizeof(pd_pool_t));if(!a_pool){error("pool_init calloc fail: %s", strerror(errno));return NULL;}a_pool->thread_num = thread_num;//初始化队列参数a_pool->queue.max_task_num = thread_max_num;a_pool->queue.cur_task_num = 0;a_pool->queue.head = NULL;a_pool->queue.tail = &a_pool->queue.head;//初始化tsdif(pthread_key_create(&key, NULL) != 0){error("pthread_key_create fail: %s", strerror(errno));goto err;}//初始化互斥锁if(pthread_mutex_init(&a_pool->mutex, NULL) != 0){error("pthread_mutex_init fail: %s", strerror(errno));pthread_key_delete(key);goto err;}//初始化条件变量if(pthread_cond_init(&a_pool->cond, NULL) != 0){error("pthread_cond_init fail: %s", strerror(errno));pthread_mutex_destroy(&a_pool->mutex);goto err;}//创建线程池if(pd_pool_create(a_pool) != 0){error("pd_pool_create fail: %s", strerror(errno));pthread_mutex_unlock(&a_pool->mutex);pthread_cond_destroy(&a_pool->cond);goto err;}return a_pool;err:free(a_pool);return NULL;}/*** 向线程池中添加任务..*/int pd_pool_add_task(pd_pool_t *a_pool, void (*routine)(void *), void *argv){pd_task_t *a_task = NULL;a_task = (pd_task_t *)calloc(1, sizeof(pd_task_t));if(!a_task){error("add task calloc faile: %s", strerror(errno));return -1;}a_task->routine = routine;a_task->argv = argv;a_task->next = NULL;/*加锁*/pthread_mutex_lock(&a_pool->mutex);if(a_pool->queue.cur_task_num >= a_pool->queue.max_task_num){error("cur_task_num >= max_task_num");goto err;}/*将任务放到末尾*/*(a_pool->queue.tail) = a_task;a_pool->queue.tail = &a_task->next;a_pool->queue.cur_task_num++;/*通知堵塞的线程*/pthread_cond_signal(&a_pool->cond);/*解锁*/pthread_mutex_unlock(&a_pool->mutex);return0;err:pthread_mutex_unlock(&a_pool->mutex);free(a_task);return -1;}void pd_pool_destroy(pd_pool_t *a_pool){unsigned int n;unsigned int lock;for(n = 0; n < a_pool->thread_num; n++){lock = 1;if(pd_pool_add_task(a_pool, pd_pool_exit_cb, &lock) != 0){error("pd_pool_destroy fail: add_task fail");return;}while(lock){usleep(1);}}pthread_mutex_destroy(&a_pool->mutex);pthread_cond_destroy(&a_pool->cond);pthread_key_delete(key);free(a_pool);}/******************************************************************************************/ void testfun(void *argv){printf("testfun\n");sleep(1);}int main(){pd_pool_t *a_pool = pd_pool_init(9, 5);pd_pool_add_task(a_pool, testfun, NULL); pd_pool_add_task(a_pool, testfun, NULL); pd_pool_add_task(a_pool, testfun, NULL); pd_pool_destroy(a_pool);}。
c++11线程池实现c++11 加入了线程库,从此告别了标准库不支持并发的历史。
然而c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池、信号量等。
线程池(thread pool)这个东西,在面试上多次被问到,一般的回答都是:“管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复。
” 貌似没有问题吧。
但是写起程序来的时候就出问题了。
废话不多说,先上实现,然后再啰嗦。
(dont talk, show me ur code !)#ifndef ILOVERS_THREAD_POOL_H#defineILOVERS_THREAD_POOL_H #include<iostream>#include <functional>#include<thread>#include <condition_variable>#include <future>#include <atomic>#include<vector>#include <queue> // 命名空间namespace ilovers { class TaskExecutor;} class ilovers::TaskExecutor{ using Task =std::function<void()>;private: // 线程池std::vector<std::thread> pool; // 任务队列std::queue<Task> tasks; // 同步std::mutexm_task; std::condition_variable cv_task; // 是否关闭提交std::atomic<bool> stop; public: // 构造TaskExecutor(size_t size = 4): stop{false}{ size = size < 1 ? 1 : size;for(size_t i = 0; i< size;++i){ pool.emplace_back(&TaskExecutor ::schedual, this); //push_back(std::thread{...}) } } // 析构~TaskExecutor(){ for(std::thread& thread : pool){ thread.detach(); // 让线程“自生自灭” //thread.join(); // 等待任务结束,前提:线程一定会执行完} } // 停止任务提交voidshutdown(){ this->stop.store(true); }// 重启任务提交voidrestart(){ this->stop.store(false); } // 提交一个任务template<class F, class... Args> auto commit(F&& f, Args&&... args)->std::future<decltype(f(args...))>{ if(stop.load()){ // stop == true ??throw std::runtime_error("task executor have closed commit."); } using ResType =decltype(f(args...)); // typenamestd::result_of<F(Args...)>::type, 函数f 的返回值类型auto task =std::make_shared<std::packaged_task<ResType()> >( std::bind(std::forward<F >(f), std::forward<Args>(args)...) ); // wtf ! { // 添加任务到队列std::lock_guard<std::mutex> lock {m_task};tasks.emplace([task](){ // push(Task{...})(*task)(); }); }cv_task.notify_all(); // 唤醒线程执行std::future<ResType> future = task->get_future(); return future; } private: // 获取一个待执行的task Taskget_one_task(){ std::unique_lock<std::mutex> lock {m_task}; cv_task.wait(lock,[this](){ return !tasks.empty(); }); // wait 直到有task Task task {std::move(tasks.front())}; // 取一个task tasks.pop(); return task; } // 任务调度void schedual(){ while(true){ if(Task task = get_one_task()){ task();// }else{ // return; //done } } }}; #endif voidf(){ std::cout << "hello, f !" << std::endl;} struct G{ int operator()(){ std::cout << "hello, g !" << std::endl; return 42; }}; intmain()try{ ilovers::TaskExecutor executor {10};std::future<void> ff = mit(f);std::future<int> fg = mit(G{});std::future<std::string> fh =mit([]()->std::string { std::cout << "hello, h !" << std::endl; return "hello,fh !";});executor.shutdown(); ff.get(); std::cout << fg.get() << " " << fh.get() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(5)); executor.restart(); // 重启任务mit(f).get(); // std::cout << "end..." << std::endl; return0;}catch(std::exception& e){ std::cout << "some unhappy happened... " << e.what() << std::endl;}为了避嫌,先进行一下版权说明:代码是me “写”的,但是思路来自Internet,特别是这个线程池实现(窝的实现,基本copy 了这个实现,好东西值得copy !)。
线程池 c语言
线程池是一种常见的多线程编程技术,可以有效地管理线程资源,提高程序的并发性能。
在C语言中,实现线程池需要使用多线程库和数据结构库,比如pthread和libuv。
线程池的基本原理是预先创建一定数量的线程,并将任务加入任务队列中。
每个线程从队列中获取任务并执行,当队列为空时,线程会进入休眠状态,等待新的任务加入队列。
实现线程池需要注意以下几点:
1. 线程数量的选择:线程数量不宜过多,否则会占用过多的系
统资源;也不宜过少,否则任务处理效率会降低。
2. 任务队列的设计:需要考虑任务队列的并发访问,以及任务
的优先级等问题。
3. 线程的创建和销毁:需要在程序启动时创建线程池,并在程
序结束时销毁线程池中的线程。
4. 线程的同步与通信:需要使用锁和条件变量等同步原语,实
现线程之间的通信和协作。
线程池是一种常见的编程技术,可以提高程序的并发性能,同时也需要注意线程间同步和通信的问题。
在C语言中,我们可以使用多线程库和数据结构库实现线程池。
- 1 -。
C++ 线程池实现使用多线程编程可以显著提高程序的运行速度,由于现在的操作系统都是多核的,所以一个多线程的程序,由于系统内核是基于时间片轮询的,所以多线程程序再用系统内核的时间大大增多,所完成的任务就更快。
线程池头文件://---------------------------------------------------------------------------#ifndef CworkQueueH#define CworkQueueH//---------------------------------------------------------------------------#include <queue>#include<vcl.h>class CWorkQueue;/**用法原理:通过派生类WorkItemBase的dowork方法来实现,线程处理任务通过create任务创建线程,并且这些线程一直在for循环里等待事件监听一旦任务栈里有数据了触发线程执行任务。
**//*------------------------------------------------------------------------WorkItemBasethis is the basic WorkItem that the Work Queue Use its interface This class should be inherited and these virtual abstract functions implemented.DoWork()virtual abstract function is the function that is called when thework item turn has came to be poped out of the queue and be processed.Abort ()This function is called, when the Destroy function is called, for each of the WorkItemsThat are left in the queue.------------------------------------------------------------------------*/class WorkItemBase{virtual void DoWork(void* pThreadContext) = 0;virtual void Abort () = 0;friend CWorkQueue;};typedef std::queue<WorkItemBase*> WorkItemQueue,*PWorkItemQueue;/*------------------------------------------------------------------------CWorkQueueThis is the WorkOueue class also known as thread pool,the basic idea of this class is creating thread that are waiting on a queueof work item when the queue is inserted with items the threads wake up andperform the requered work and go to sleep again.------------------------------------------------------------------------*/class CWorkQueue{public:virtual~CWorkQueue(){};bool Create(const unsigned int nNumberOfThreads,void* *pThreadDataArray = NULL);bool InsertWorkItem(WorkItemBase* pWorkItem);void Destroy(int iWairSecond);int GetThreadTotalNum();private:static unsigned long__stdcall ThreadFunc( void* pParam ); WorkItemBase* RemoveWorkItem();int GetWorekQueueSize();enum{ABORT_EVENT_INDEX = 0,SEMAPHORE_INDEX,NUMBER_OF_SYNC_OBJ,};//申请到的线程PHANDLE m_phThreads;unsigned int m_nNumberOfThreads;void* m_pThreadDataArray;HANDLE m_phSincObjectsArray[NUMBER_OF_SYNC_OBJ]; CRITICAL_SECTION m_CriticalSection;PWorkItemQueue m_pWorkItemQueue;};#endifCPP实现#pragma hdrstop#include "CworkQueue.h"//---------------------------------------------------------------------------#include <assert.h>typedef struct_THREAD_CONTEXT{CWorkQueue* pWorkQueue;void* pThreadData;} THREAD_CONTEXT,*PTHREAD_CONTEXT;/*------------------------------------------------------------------------建立多线程 nNumberOfThreads多线程数目 ThreadData线程函数执行的参数------------------------------------------------------------------------*/bool CWorkQueue::Create(const unsigned int nNumberOfThreads,void* *ThreadData /*=NULL*/){//创建任务队列,存放后续将要执行的任务m_pWorkItemQueue = new WorkItemQueue();if(NULL == m_pWorkItemQueue ){return false;}//m_phSincObjectsArray保存了线程池的信号量和事件//m_phSincObjectsArray[ABORT_EVENT_INDEX]保存的是事件,当用户设置退出事件时使用//m_phSincObjectsArray[SEMAPHORE_INDEX]保存信号量,当用户设置执行任务时使用//创建信号量(多线程同步使用)/*在信号量上我们定义两种操作: Wait(等待)和 Release(释放)。
当一个线程调用Wait操作时,它要么得到资源然后将信号量减一,要么一直等下去(指放入阻塞队列),直到信号量大于等于一时。
Release(释放)实际上是在信号量上执行加操作*/m_phSincObjectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL,0,LONG_MAX,NULL);if(m_phSincObjectsArray[SEMAPHORE_INDEX] == NULL){delete m_pWorkItemQueue;m_pWorkItemQueue = NULL;return false;}//创建事件为手动置位,一次只能进入一个,False为初始不是运行状态(多线程同步使用)m_phSincObjectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL,TRUE,FALSE,NULL);if(m_phSincObjectsArray[ABORT_EVENT_INDEX] == NULL){delete m_pWorkItemQueue;m_pWorkItemQueue = NULL;CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]); return false;}//创建并初始化临界区(多线程互斥访问使用)InitializeCriticalSection(&m_CriticalSection);//创建线程数组m_phThreads = new HANDLE[nNumberOfThreads];if(m_phThreads == NULL){delete m_pWorkItemQueue;m_pWorkItemQueue = NULL;CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]); CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]); DeleteCriticalSection(&m_CriticalSection);return false;}unsigned int i;m_nNumberOfThreads = nNumberOfThreads;DWORD dwThreadId;PTHREAD_CONTEXT pThreadsContext;//创建所有的线程for(i = 0 ; i < nNumberOfThreads ; i++ ){//初始化线程函数运行时传入的参数pThreadsContext = new THREAD_CONTEXT;pThreadsContext->pWorkQueue = this;pThreadsContext->pThreadData = ThreadData == NULL? NULL : ThreadData[i];//创建线程m_phThreads[i] = CreateThread(NULL,0,CWorkQueue::ThreadFunc,pThreadsContext,0,&dwThreadId);if(m_phThreads[i] == NULL){delete pThreadsContext;m_nNumberOfThreads = i;Destroy(5);return false;}}return true;}/*------------------------------------------------------------------------向任务队列添加任务任务执行类通过继承基类WorkItemBase之后使用多态函数DoWork来完成真实任务------------------------------------------------------------------------*/bool CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem){assert(pWorkItem != NULL);//多线程互斥访问,进入临界区EnterCriticalSection(&m_CriticalSection);//将任务插入队列m_pWorkItemQueue->push(pWorkItem);//离开临界区LeaveCriticalSection(&m_CriticalSection);//释放信号量,使信号量加1,促使后面的Wailt操作执行if(!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL)){assert(false);return false;}return true;}/*------------------------------------------------------------------------从工作队列中移除对象,并返回移除的对象------------------------------------------------------------------------*/WorkItemBase* CWorkQueue::RemoveWorkItem(){WorkItemBase* pWorkItem;//多线程间访问互斥,进入临界区EnterCriticalSection(&m_CriticalSection);//移除对象pWorkItem = m_pWorkItemQueue->front();m_pWorkItemQueue->pop();//离开临界区,其他等待线程可以进入临界区LeaveCriticalSection(&m_CriticalSection);assert(pWorkItem != NULL);return pWorkItem;}/*------------------------------------------------------------------------线程执行的函数,实际执行的是任务队列中的DoWork()------------------------------------------------------------------------*/unsigned long__stdcall CWorkQueue::ThreadFunc( void* pParam ){//创建线程时传入的参数PTHREAD_CONTEXT pThreadContext = (PTHREAD_CONTEXT)pParam;WorkItemBase* pWorkItem = NULL;CWorkQueue* pWorkQueue = pThreadContext->pWorkQueue;void* pThreadData = pThreadContext->pThreadData;DWORD dwWaitResult;for(;;){//WaitForMultipleObjects等待pWorkQueue->m_phSincObjectsArray信号量数组两件事//一个是执行任务的释放信号量,一个是异常的释放信号量//当WaitForMultipleObjects等到多个内核对象的时候,如果它的bWaitAll 参数设置为false。