pthread库实现一个简单的任务池
类关系图: 说明: 1:TaskManager类管理Task类,Task类是一个纯虚类; 2:ThreadManager类管理Thread类,Thread类封装pthread库的相关线程函数; 3:Thread类通过TaskManager类获取需要执行的任务; 4:ThreadManager类中包含一个TaskManager用于管理需要其管理线程执行的Task; 5:UserTask是用户继承Task类而来的用户自己的任务类,最后线程池中运行的任务就是UserTask。 该交代的都交代了,现在上代码。^-^ Task.h
/*********************** author: zhanghang* date: 2016.02.27* file: Task.h* usage: this is the pure virtual base class of the task in the task pool***********************/#ifndef _TASK_H#define _TASK_H#define TIMES_OF_SLEEP_ONE_SEC 1500000class Task{public: Task(){}; virtual ~Task(){}; virtual void Process() = 0; // all the delay operations should use this function void Delay(int sec) { for(int i=0; i自己封装了一个锁类,GuardLock.h
/************************* author: zhanghang* date: 2016.02.27* file: GuardLock.h* usage: this class to lock a code scope************************/#ifndef _GUARD_LOCK_H#define _GUARD_LOCK_H#includeTaskManager.h#include class GuardLock{public: GuardLock(pthread_mutex_t *pMutex):m_pMutex(pMutex) { pthread_mutex_lock(m_pMutex); } ~GuardLock() { pthread_mutex_unlock(m_pMutex); }private: pthread_mutex_t *m_pMutex;}; #endif
/************************* author: zhanghang* date: 2016.02.27* file: TaskManager.h* usage: this class is to manage the tasks***********************/#ifndef _TASK_MANAGER_H#define _TASK_MANAGER_H#includeTaskManager.cpp#include class Task;class TaskManager{public: TaskManager(); ~TaskManager(); void AddTask(Task *pTask); void DeleteAllTask(); int GetTaskNum(); Task *GetATask();private: std::list
m_TaskList; pthread_mutex_t m_AccTaskListMutex;}; #endif
/************************* author: zhanghang* date: 2016.02.27* file: TaskManager.cpp* usage: this class is to manage the tasks***********************/#include "GuardLock.h"#include "TaskManager.h"#include "Task.h"#includeThread.hTaskManager::TaskManager(){ std::cout << "TaskManager::TaskManager*" << (void *)this << std::endl;}TaskManager::~TaskManager(){ std::cout << "TaskManager::~TaskManager#" << (void *)this << std::endl; if(m_TaskList.size() > 0) { DeleteAllTask(); }} void TaskManager::AddTask(Task *pTask){ GuardLock lock(&m_AccTaskListMutex); if(NULL != pTask) { m_TaskList.push_back(pTask); }}void TaskManager::DeleteAllTask(){ GuardLock lock(&m_AccTaskListMutex); std::list ::iterator taskItr = m_TaskList.begin(); while(m_TaskList.end() != taskItr) { delete *taskItr; ++taskItr; } m_TaskList.clear();}int TaskManager::GetTaskNum(){ return m_TaskList.size();}Task *TaskManager::GetATask(){ GuardLock lock(&m_AccTaskListMutex); Task *pTask = NULL; std::list ::iterator taskBeg = m_TaskList.begin(); if(m_TaskList.end() != taskBeg) { pTask = (*taskBeg); m_TaskList.erase(taskBeg); } return pTask;}
/************************* author: zhanghang* date: 2016.02.27* file: Thread.h* usage: this class is to new a thread***********************/ #ifndef _THREAD_H#define _THREAD_H#includeThread.cpp#define TIMES_OF_SLEEP_ONE_SEC 1500000class Task;class TaskManager;// the entrance of the new threadvoid *ThreadMain(void *pArg);class Thread{public: Thread(TaskManager &taskManager); ~Thread(); int Run(); int Stop(); bool IsFree(); // because there is one cnacel point in the sleep func // so i write my own delay func void Delay(int sec); friend void *ThreadMain(void *pArg);private: bool m_IsFree; bool m_ExitFlag; Task *m_pTask; TaskManager &m_TaskMgr; pthread_t m_ThreadID;};#endif
/************************* author: zhanghang* date: 2016.02.27* file: Thread.cpp* usage: this class is to new a thread***********************/#include "Thread.h"#include "Task.h"#include "TaskManager.h"#includeThreadManager.hThread::Thread(TaskManager &taskMgr): m_IsFree(true), m_ExitFlag(false), m_pTask(NULL), m_TaskMgr(taskMgr), m_ThreadID(0){ std::cout << "Thread::Thread*" << (void *)this << std::endl;}Thread::~Thread(){ std::cout << "Thread::~Thread#" << (void *)this << std::endl; if(!m_ExitFlag) { m_ExitFlag = true; }} int Thread::Run(){ return pthread_create(&m_ThreadID, NULL, ThreadMain, this);}int Thread::Stop(){ m_ExitFlag = true; void *pRet = NULL; pthread_cancel(m_ThreadID); return pthread_join(m_ThreadID, &pRet);}bool Thread::IsFree(){ return m_IsFree;}void Thread::Delay(int sec){ for(int i=0; i Process(); // clear the task room delete rThread.m_pTask; rThread.m_pTask = NULL; // set a cancel point pthread_testcancel(); }}
/************************* author: zhanghang* date: 2016.02.07* file: ThreadManager.h* usage: this class is to manage the threads*******************************/#ifndef _THREAD_MANAGER_H#define _THREAD_MANAGER_H#includeThreadManager.cppclass Task;class Thread;class TaskManager;class ThreadManager{public: ThreadManager(); ~ThreadManager(); int StartThreads(int num); int StopThreads(); void AddTask(Task *); int GetTaskNum(); int GetThreadNum(); bool CreateTaskManager();private: std::list
m_ThreadList; TaskManager *m_pTaskMgr;};#endif
/********************** author: zhanghang* date: 2016.02.27* file: ThreadManager.cpp* usage: this class is to manage the task pool*************************/#includemain.cpp#include "ThreadManager.h"#include "Thread.h"#include "Task.h"#include "TaskManager.h"ThreadManager::ThreadManager(): m_pTaskMgr(NULL){ std::cout << "ThreadManager::ThreadManager*" << (void *)this << std::endl;}ThreadManager::~ThreadManager(){ std::cout << "ThreadManager::~ThreadManager#" << (void *)this << std::endl; if(m_ThreadList.size() > 0) { StopThreads(); } if(NULL != m_pTaskMgr) { delete m_pTaskMgr; m_pTaskMgr = NULL; } }bool ThreadManager::CreateTaskManager(){ if(NULL == m_pTaskMgr) { m_pTaskMgr = new TaskManager(); if(NULL == m_pTaskMgr) { return false; } } return true;}// return the truely thread numint ThreadManager::StartThreads(int num){ if(!CreateTaskManager()) { return 0; } int result = 0; for(int i=0; i Run(); if(0 != result) { std::cout << "one thread started failed! error code: " << result << std::endl; continue; } m_ThreadList.push_back(pThread); } return m_ThreadList.size();}// return the num of threads that leftint ThreadManager::StopThreads(){ std::list ::iterator threadItr = m_ThreadList.begin(); int result = 0; while(m_ThreadList.end() != threadItr) { result = (*threadItr)->Stop(); if(0 != result) { std::cout << "one thread stopped failed! error code: " << result << std::endl; ++threadItr; continue; } // delete the thread delete *threadItr; m_ThreadList.erase(threadItr++); } // the left thread return m_ThreadList.size();}void ThreadManager::AddTask(Task *pTask){ if(!CreateTaskManager()) { return; } m_pTaskMgr->AddTask(pTask);}int ThreadManager::GetTaskNum(){ if(!CreateTaskManager()) { return 0; } return m_pTaskMgr->GetTaskNum();}int ThreadManager::GetThreadNum(){ return m_ThreadList.size();}
/************** author: zhanghang* date: 2016.02.27* file: main.cpp* usage: this is a demo of taskpool********************************/#include "ThreadManager.h"#include "Task.h"#includemakefile#include #define THREAD_NUM 3// this is the taskclass MyTask : public Task{public: MyTask() { std::cout << "MyTask::MyTask*" << (void *)this << std::endl; } ~MyTask() { std::cout << "MyTask::~MyTask#" << (void *)this << std::endl; } void Process() { std::cout << "thread id: " << pthread_self() << std::endl; Delay(1); }};int main(int argc, char **argv){ ThreadManager threadMgr; threadMgr.StartThreads(THREAD_NUM); for(int i=0; i<20; ++i) { MyTask *pMyTask = new MyTask(); Task *pTask = (Task *) pMyTask; // add task threadMgr.AddTask(pTask); } sleep(4); threadMgr.StopThreads(); return 1;}
main: ThreadMgr Thread TaskMgr g++ -o main main.cpp ThreadManager.o Thread.o TaskManager.o -lpthread -gThreadMgr: g++ -c ThreadManager.cpp ThreadManager.h Thread.h TaskManager.h Task.h -gThread: g++ -c Thread.cpp Thread.h Task.h TaskManager.h -gTaskMgr: g++ -c TaskManager.cpp TaskManager.h GuardLock.h Task.h -gclean: rm *.o运行程序后程序的输出: ThreadManager::ThreadManager*0xbfdf52d4 TaskManager::TaskManager*0x8599008 Thread::Thread*0x8599030 Thread::Thread*0x85990f0 Thread::Thread*0x85991b0 MyTask::MyTask*0x8599270 MyTask::MyTask*0x8599290 MyTask::MyTask*0x85992b0 MyTask::MyTask*0x85992d0 MyTask::MyTask*0x85992f0 MyTask::MyTask*0x8599310 MyTask::MyTask*0x8599330 MyTask::MyTask*0x8599350 MyTask::MyTask*0x8599370 MyTask::MyTask*0x8599390 MyTask::MyTask*0x85993b0 MyTask::MyTask*0x85993d0 MyTask::MyTask*0x85993f0 MyTask::MyTask*0x8599410 MyTask::MyTask*0x8599430 MyTask::MyTask*0x8599450 MyTask::MyTask*0x8599470 MyTask::MyTask*0x8599490 MyTask::MyTask*0x85994b0 MyTask::MyTask*0x85994d0 thread id: 3058080576 thread id: 3074865984 thread id: 3066473280 MyTask::~MyTask#0x8599270 thread id: 3058080576 MyTask::~MyTask#0x8599290 thread id: 3074865984 MyTask::~MyTask#0x85992b0 thread id: 3066473280 MyTask::~MyTask#0x85992d0 thread id: 3058080576 MyTask::~MyTask#0x85992f0 thread id: 3074865984 MyTask::~MyTask#0x8599310 thread id: 3066473280 MyTask::~MyTask#0x8599330 thread id: 3058080576 MyTask::~MyTask#0x8599350 thread id: 3074865984 MyTask::~MyTask#0x8599370 thread id: 3066473280 MyTask::~MyTask#0x85993b0 thread id: 3074865984 MyTask::~MyTask#0x8599390 thread id: 3058080576 MyTask::~MyTask#0x85993f0 thread id: 3074865984 MyTask::~MyTask#0x85993d0 thread id: 3066473280 MyTask::~MyTask#0x8599410 thread id: 3058080576 MyTask::~MyTask#0x8599430 thread id: 3074865984 MyTask::~MyTask#0x8599450 thread id: 3066473280 MyTask::~MyTask#0x8599470 thread id: 3058080576 MyTask::~MyTask#0x8599490 MyTask::~MyTask#0x85994b0 MyTask::~MyTask#0x85994d0 Thread::~Thread#0x8599030 Thread::~Thread#0x85990f0 Thread::~Thread#0x85991b0 ThreadManager::~ThreadManager#0xbfdf52d4 TaskManager::~TaskManager#0x8599008 谢谢。