PTLib
Version 2.10.4
|
00001 /* 00002 * threadpool.h 00003 * 00004 * Generalised Thread Pooling functions 00005 * 00006 * Portable Tools Library 00007 * 00008 * Copyright (C) 2009 Post Increment 00009 * 00010 * The contents of this file are subject to the Mozilla Public License 00011 * Version 1.0 (the "License"); you may not use this file except in 00012 * compliance with the License. You may obtain a copy of the License at 00013 * http://www.mozilla.org/MPL/ 00014 * 00015 * Software distributed under the License is distributed on an "AS IS" 00016 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 00017 * the License for the specific language governing rights and limitations 00018 * under the License. 00019 * 00020 * The Original Code is Portable Windows Library. 00021 * 00022 * The Initial Developer of the Original Code is Post Increment 00023 * 00024 * Portions of this code were written with the financial assistance of 00025 * Metreos Corporation (http://www.metros.com). 00026 * 00027 * Contributor(s): ______________________________________. 00028 * 00029 * $Revision: 25362 $ 00030 * $Author: rjongbloed $ 00031 * $Date: 2011-03-20 18:27:31 -0500 (Sun, 20 Mar 2011) $ 00032 */ 00033 00034 00035 #ifndef PTLIB_THREADPOOL_H 00036 #define PTLIB_THREADPOOL_H 00037 00038 #ifdef P_USE_PRAGMA 00039 #pragma interface 00040 #endif 00041 00042 #include <map> 00043 #include <queue> 00044 00045 00150 class PThreadPoolBase : public PObject 00151 { 00152 public: 00153 class WorkerThreadBase : public PThread 00154 { 00155 public: 00156 WorkerThreadBase(Priority priority = NormalPriority) 00157 : PThread(100, NoAutoDeleteThread, priority, "Pool") 00158 , m_shutdown(false) 00159 { } 00160 00161 virtual void Shutdown() = 0; 00162 virtual unsigned GetWorkSize() const = 0; 00163 00164 bool m_shutdown; 00165 PMutex m_workerMutex; 00166 }; 00167 00168 class InternalWorkBase 00169 { 00170 public: 00171 InternalWorkBase(const char * group) 00172 { 00173 if (group != NULL) 00174 m_group = group; 00175 } 00176 std::string m_group; 00177 }; 00178 00179 ~PThreadPoolBase(); 00180 00181 virtual WorkerThreadBase * CreateWorkerThread() = 0; 00182 virtual WorkerThreadBase * AllocateWorker(); 00183 virtual WorkerThreadBase * NewWorker(); 00184 00185 unsigned GetMaxWorkers() const { return m_maxWorkerCount; } 00186 00187 void SetMaxWorkers( 00188 unsigned count 00189 ) { m_maxWorkerCount = count; } 00190 00191 unsigned GetMaxUnits() const { return m_maxWorkUnitCount; } 00192 00193 void SetMaxUnits( 00194 unsigned count 00195 ) { m_maxWorkUnitCount = count; } 00196 00197 protected: 00198 PThreadPoolBase(unsigned maxWorkerCount = 10, unsigned maxWorkUnitCount = 0); 00199 00200 virtual bool CheckWorker(WorkerThreadBase * worker); 00201 void StopWorker(WorkerThreadBase * worker); 00202 PMutex m_listMutex; 00203 00204 typedef std::vector<WorkerThreadBase *> WorkerList_t; 00205 WorkerList_t m_workers; 00206 00207 unsigned m_maxWorkerCount; 00208 unsigned m_maxWorkUnitCount; 00209 }; 00210 00211 00214 template <class Work_T> 00215 class PThreadPool : public PThreadPoolBase 00216 { 00217 PCLASSINFO(PThreadPool, PThreadPoolBase); 00218 public: 00219 // 00220 // constructor 00221 // 00222 PThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0) 00223 : PThreadPoolBase(maxWorkers, maxWorkUnits) 00224 { } 00225 00226 // 00227 // define the ancestor of the worker thread 00228 // 00229 class WorkerThread : public WorkerThreadBase 00230 { 00231 public: 00232 WorkerThread(PThreadPool & pool, Priority priority = NormalPriority) 00233 : WorkerThreadBase(priority) 00234 , m_pool(pool) 00235 { 00236 } 00237 00238 virtual void AddWork(Work_T * work) = 0; 00239 virtual void RemoveWork(Work_T * work) = 0; 00240 virtual void Main() = 0; 00241 00242 protected: 00243 PThreadPool & m_pool; 00244 }; 00245 00246 // 00247 // define internal worker wrapper class 00248 // 00249 class InternalWork : public InternalWorkBase 00250 { 00251 public: 00252 InternalWork(WorkerThread * worker, Work_T * work, const char * group) 00253 : InternalWorkBase(group) 00254 , m_worker(worker) 00255 , m_work(work) 00256 { 00257 } 00258 00259 WorkerThread * m_worker; 00260 Work_T * m_work; 00261 }; 00262 00263 // 00264 // define map for external work units to internal work 00265 // 00266 typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T; 00267 ExternalToInternalWorkMap_T m_externalToInternalWorkMap; 00268 00269 00270 // 00271 // define class for storing group informationm 00272 // 00273 struct GroupInfo { 00274 unsigned m_count; 00275 WorkerThread * m_worker; 00276 }; 00277 00278 00279 // 00280 // define map for group ID to group information 00281 // 00282 typedef std::map<std::string, GroupInfo> GroupInfoMap_t; 00283 GroupInfoMap_t m_groupInfoMap; 00284 00285 00286 // 00287 // add a new unit of work to a worker thread 00288 // 00289 bool AddWork(Work_T * work, const char * group = NULL) 00290 { 00291 PWaitAndSignal m(m_listMutex); 00292 00293 // allocate by group if specified 00294 // else allocate to least busy 00295 WorkerThread * worker; 00296 if ((group == NULL) || (strlen(group) == 0)) { 00297 worker = (WorkerThread *)AllocateWorker(); 00298 } 00299 else { 00300 00301 // find the worker thread with the matching group ID 00302 // if no matching Id, then create a new thread 00303 typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group); 00304 if (g == m_groupInfoMap.end()) 00305 worker = (WorkerThread *)AllocateWorker(); 00306 else { 00307 worker = g->second.m_worker; 00308 PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group); 00309 } 00310 } 00311 00312 // if cannot allocate worker, return 00313 if (worker == NULL) 00314 return false; 00315 00316 // create internal work structure 00317 InternalWork internalWork(worker, work, group); 00318 00319 // add work to external to internal map 00320 m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork)); 00321 00322 // add group ID to map 00323 if (!internalWork.m_group.empty()) { 00324 typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group); 00325 if (r != m_groupInfoMap.end()) 00326 ++r->second.m_count; 00327 else { 00328 GroupInfo info; 00329 info.m_count = 1; 00330 info.m_worker = worker; 00331 m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info)); 00332 } 00333 } 00334 00335 // give the work to the worker 00336 worker->AddWork(work); 00337 00338 return true; 00339 } 00340 00341 // 00342 // remove a unit of work from a worker thread 00343 // 00344 bool RemoveWork(Work_T * work, bool removeFromWorker = true) 00345 { 00346 PWaitAndSignal m(m_listMutex); 00347 00348 // find worker with work unit to remove 00349 typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work); 00350 if (iterWork == m_externalToInternalWorkMap.end()) 00351 return false; 00352 00353 InternalWork & internalWork = iterWork->second; 00354 00355 // tell worker to stop processing work 00356 if (removeFromWorker) 00357 internalWork.m_worker->RemoveWork(work); 00358 00359 // update group information 00360 if (!internalWork.m_group.empty()) { 00361 typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group); 00362 PAssert(iterGroup != m_groupInfoMap.end(), "Attempt to find thread from unknown work group"); 00363 if (iterGroup != m_groupInfoMap.end()) { 00364 if (--iterGroup->second.m_count == 0) 00365 m_groupInfoMap.erase(iterGroup); 00366 } 00367 } 00368 00369 // see if workers need reorganising 00370 CheckWorker(internalWork.m_worker); 00371 00372 // remove element from work unit map 00373 m_externalToInternalWorkMap.erase(iterWork); 00374 00375 return true; 00376 } 00377 }; 00378 00379 00382 template <class Work_T> 00383 class PQueuedThreadPool : public PThreadPool<Work_T> 00384 { 00385 public: 00386 // 00387 // constructor 00388 // 00389 PQueuedThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0) 00390 : PThreadPool<Work_T>(maxWorkers, maxWorkUnits) 00391 { } 00392 00393 class QueuedWorkerThread : public PThreadPool<Work_T>::WorkerThread 00394 { 00395 public: 00396 QueuedWorkerThread(PThreadPool<Work_T> & pool, PThread::Priority priority = PThread::NormalPriority) 00397 : PThreadPool<Work_T>::WorkerThread(pool, priority) 00398 , m_available(0, INT_MAX) 00399 { 00400 } 00401 00402 void AddWork(Work_T * work) 00403 { 00404 m_mutex.Wait(); 00405 m_queue.push(work); 00406 m_available.Signal(); 00407 m_mutex.Signal(); 00408 } 00409 00410 void RemoveWork(Work_T * ) 00411 { 00412 m_mutex.Wait(); 00413 Work_T * work = m_queue.front(); 00414 m_queue.pop(); 00415 m_mutex.Signal(); 00416 delete work; 00417 } 00418 00419 unsigned GetWorkSize() const 00420 { 00421 return (unsigned)m_queue.size(); 00422 } 00423 00424 void Main() 00425 { 00426 for (;;) { 00427 m_available.Wait(); 00428 if (PThreadPool<Work_T>::WorkerThread::m_shutdown) 00429 break; 00430 00431 m_mutex.Wait(); 00432 Work_T * work = m_queue.empty() ? NULL : m_queue.front(); 00433 m_mutex.Signal(); 00434 00435 if (work != NULL) { 00436 work->Work(); 00437 PThreadPool<Work_T>::WorkerThread::m_pool.RemoveWork(work); 00438 } 00439 } 00440 } 00441 00442 void Shutdown() 00443 { 00444 PThreadPool<Work_T>::WorkerThread::m_shutdown = true; 00445 m_available.Signal(); 00446 } 00447 00448 protected: 00449 typedef std::queue<Work_T *> Queue; 00450 Queue m_queue; 00451 PMutex m_mutex; 00452 PSemaphore m_available; 00453 }; 00454 00455 00456 virtual PThreadPoolBase::WorkerThreadBase * CreateWorkerThread() 00457 { 00458 return new QueuedWorkerThread(*this); 00459 } 00460 }; 00461 00462 00463 #endif // PTLIB_THREADPOOL_H 00464 00465 00466 // End Of File ///////////////////////////////////////////////////////////////