libkdepim Library API Documentation

weaver.cpp

00001 /* -*- C++ -*- 00002 00003 This file implements the Weaver, Job and Thread classes. 00004 00005 $ Author: Mirko Boehm $ 00006 $ Copyright: (C) 2004, Mirko Boehm $ 00007 $ Contact: mirko@kde.org 00008 http://www.kde.org 00009 http://www.hackerbuero.org $ 00010 $ License: LGPL with the following explicit clarification: 00011 This code may be linked against any version of the Qt toolkit 00012 from Troll Tech, Norway. $ 00013 00014 */ 00015 00016 extern "C" { 00017 #include <signal.h> 00018 } 00019 00020 #include <qevent.h> 00021 #include <qapplication.h> 00022 00023 #include "weaver.h" 00024 00025 namespace KPIM { 00026 namespace ThreadWeaver { 00027 00028 bool Debug = true; 00029 int DebugLevel = 2; 00030 00031 Job::Job (QObject* parent, const char* name) 00032 : QObject (parent, name), 00033 m_finished (false), 00034 m_mutex (new QMutex (true) ), 00035 m_thread (0) 00036 { 00037 } 00038 00039 Job::~Job() 00040 { 00041 } 00042 00043 void Job::lock() 00044 { 00045 m_mutex->lock(); 00046 } 00047 00048 void Job::unlock() 00049 { 00050 m_mutex->unlock(); 00051 } 00052 00053 void Job::execute(Thread *th) 00054 { 00055 m_mutex->lock(); 00056 m_thread = th; 00057 m_mutex->unlock(); 00058 00059 run (); 00060 00061 m_mutex->lock(); 00062 setFinished (true); 00063 m_thread = 0; 00064 m_mutex->unlock(); 00065 } 00066 00067 Thread *Job::thread () 00068 { 00069 QMutexLocker l (m_mutex); 00070 return m_thread; 00071 } 00072 00073 bool Job::isFinished() const 00074 { 00075 QMutexLocker l (m_mutex); 00076 return m_finished; 00077 } 00078 00079 void Job::setFinished(bool status) 00080 { 00081 QMutexLocker l (m_mutex); 00082 m_finished = status; 00083 } 00084 00085 void Job::processEvent (Event *e) 00086 { 00087 switch ( e->action() ) 00088 { 00089 case Event::JobStarted: 00090 emit ( started() ); 00091 break; 00092 case Event::JobFinished: 00093 emit ( done() ); 00094 break; 00095 case Event::JobSPR: 00096 emit ( SPR () ); 00097 m_wc->wakeOne (); 00098 break; 00099 case Event::JobAPR: 00100 emit ( APR () ); 00101 // no wake here ! 00102 break; 00103 default: 00104 break; 00105 } 00106 } 00107 00108 void Job::triggerSPR () 00109 { 00110 m_mutex->lock (); 00111 m_wc = new QWaitCondition; 00112 m_mutex->unlock (); 00113 00114 thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this); 00115 m_wc->wait (); 00116 00117 m_mutex->lock (); 00118 delete m_wc; 00119 m_wc = 0; 00120 m_mutex->unlock (); 00121 } 00122 00123 void Job::triggerAPR () 00124 { 00125 m_mutex->lock (); 00126 m_wc = new QWaitCondition; 00127 m_mutex->unlock (); 00128 00129 thread()->post (KPIM::ThreadWeaver::Event::JobAPR, this); 00130 m_wc->wait (); 00131 } 00132 00133 void Job::wakeAPR () 00134 { 00135 QMutexLocker l(m_mutex); 00136 if ( m_wc!=0 ) 00137 { 00138 m_wc->wakeOne (); 00139 delete m_wc; 00140 m_wc = 0; 00141 } 00142 } 00143 00144 const int Event::Type = QEvent::User + 1000; 00145 00146 Event::Event ( Action action, Thread *thread, Job *job) 00147 : QCustomEvent ( type () ), 00148 m_action (action), 00149 m_thread (thread), 00150 m_job (job) 00151 { 00152 } 00153 00154 const int Event::type () 00155 { 00156 return Type; 00157 } 00158 00159 Thread* Event::thread () const 00160 { 00161 if ( m_thread != 0) 00162 { 00163 return m_thread; 00164 } else { 00165 return 0; 00166 } 00167 } 00168 00169 Job* Event::job () const 00170 { 00171 return m_job; 00172 } 00173 00174 Event::Action Event::action () const 00175 { 00176 return m_action; 00177 } 00178 00179 unsigned int Thread::sm_Id; 00180 00181 Thread::Thread (Weaver *parent) 00182 : QThread (), 00183 m_parent ( parent ), 00184 m_id ( makeId() ) 00185 { 00186 } 00187 00188 Thread::~Thread() 00189 { 00190 } 00191 00192 unsigned int Thread::makeId() 00193 { 00194 static QMutex mutex; 00195 QMutexLocker l (&mutex); 00196 00197 return ++sm_Id; 00198 } 00199 00200 const unsigned int Thread::id() const 00201 { 00202 return m_id; 00203 } 00204 00205 void Thread::run() 00206 { 00207 Job *job = 0; 00208 00209 post ( Event::ThreadStarted ); 00210 00211 while (true) 00212 { 00213 debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() ); 00214 00215 job = m_parent->applyForWork ( this, job ); 00216 00217 if (job == 0) 00218 { 00219 break; 00220 } else { 00221 post ( Event::JobStarted, job ); 00222 job->execute (this); 00223 post ( Event::JobFinished, job ); 00224 } 00225 } 00226 00227 post ( Event::ThreadExiting ); 00228 } 00229 00230 void Thread::post (Event::Action a, Job *j) 00231 { 00232 m_parent->post ( a, this, j); 00233 } 00234 00235 void Thread::msleep(unsigned long msec) 00236 { 00237 QThread::msleep(msec); 00238 } 00239 00240 Weaver::Weaver(QObject* parent, const char* name, 00241 int inventoryMin, int inventoryMax) 00242 : QObject(parent, name), 00243 m_active(0), 00244 m_inventoryMin(inventoryMin), 00245 m_inventoryMax(inventoryMax), 00246 m_shuttingDown(false), 00247 m_running (false), 00248 m_suspend (false), 00249 m_mutex ( new QMutex(true) ) 00250 { 00251 lock(); 00252 00253 for ( int count = 0; count < m_inventoryMin; ++count) 00254 { 00255 Thread *th = new Thread(this); 00256 m_inventory.append(th); 00257 // this will idle the thread, waiting for a job 00258 th->start(); 00259 00260 emit (threadCreated (th) ); 00261 } 00262 00263 unlock(); 00264 } 00265 00266 Weaver::~Weaver() 00267 { 00268 lock(); 00269 00270 debug ( 1, "Weaver dtor: destroying inventory.\n" ); 00271 00272 m_shuttingDown = true; 00273 00274 unlock(); 00275 00276 m_jobAvailable.wakeAll(); 00277 00278 // problem: Some threads might not be asleep yet, just finding 00279 // out if a job is available. Those threads will suspend 00280 // waiting for their next job (a rare case, but not impossible). 00281 // Therefore, if we encounter a thread that has not exited, we 00282 // have to wake it again (which we do in the following for 00283 // loop). 00284 00285 for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() ) 00286 { 00287 if ( !th->finished() ) 00288 { 00289 m_jobAvailable.wakeAll(); 00290 th->wait(); 00291 } 00292 00293 emit (threadDestroyed (th) ); 00294 delete th; 00295 00296 } 00297 00298 m_inventory.clear(); 00299 00300 delete m_mutex; 00301 00302 debug ( 1, "Weaver dtor: done\n" ); 00303 00304 } 00305 00306 void Weaver::lock() 00307 { 00308 debug ( 3 , "Weaver::lock: lock (mutex is %s).\n", 00309 ( m_mutex->locked() ? "locked" : "not locked" ) ); 00310 m_mutex->lock(); 00311 } 00312 00313 void Weaver::unlock() 00314 { 00315 m_mutex->unlock(); 00316 00317 debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n", 00318 ( m_mutex->locked() ? "locked" : "not locked" ) ); 00319 } 00320 00321 int Weaver::threads () const 00322 { 00323 QMutexLocker l (m_mutex); 00324 return m_inventory.count (); 00325 } 00326 00327 void Weaver::enqueue(Job* job) 00328 { 00329 lock(); 00330 00331 m_assignments.append(job); 00332 m_running = true; 00333 00334 unlock(); 00335 00336 assignJobs(); 00337 } 00338 00339 void Weaver::enqueue (QPtrList <Job> jobs) 00340 { 00341 lock(); 00342 00343 for ( Job * job = jobs.first(); job; job = jobs.next() ) 00344 { 00345 m_assignments.append (job); 00346 } 00347 00348 unlock(); 00349 00350 assignJobs(); 00351 } 00352 00353 bool Weaver::dequeue ( Job* job ) 00354 { 00355 QMutexLocker l (m_mutex); 00356 return m_assignments.remove (job); 00357 } 00358 00359 void Weaver::dequeue () 00360 { 00361 QMutexLocker l (m_mutex); 00362 m_assignments.clear(); 00363 } 00364 00365 void Weaver::suspend (bool state) 00366 { 00367 lock(); 00368 00369 if (state) 00370 { 00371 // no need to wake any threads here 00372 m_suspend = true; 00373 if ( m_active == 0 && isEmpty() ) 00374 { // instead of waking up threads: 00375 post (Event::Suspended); 00376 } 00377 } else { 00378 m_suspend = false; 00379 // make sure we emit suspended () even if all threads are sleeping: 00380 assignJobs (); 00381 debug (2, "Weaver::suspend: queueing resumed.\n" ); 00382 } 00383 00384 unlock(); 00385 } 00386 00387 void Weaver::assignJobs() 00388 { 00389 m_jobAvailable.wakeAll(); 00390 } 00391 00392 bool Weaver::event (QEvent *e ) 00393 { 00394 if ( e->type() >= QEvent::User ) 00395 { 00396 00397 if ( e->type() == Event::type() ) 00398 { 00399 Event *event = (Event*) e; 00400 00401 switch (event->action() ) 00402 { 00403 case Event::JobFinished: 00404 if ( event->job() !=0 ) 00405 { 00406 emit (jobDone (event->job() ) ); 00407 } 00408 break; 00409 case Event::Finished: 00410 emit ( finished() ); 00411 break; 00412 case Event::Suspended: 00413 emit ( suspended() ); 00414 break; 00415 case Event::ThreadSuspended: 00416 if (!m_shuttingDown ) 00417 { 00418 emit (threadSuspended ( event->thread() ) ); 00419 } 00420 break; 00421 case Event::ThreadBusy: 00422 if (!m_shuttingDown ) 00423 { 00424 emit (threadBusy (event->thread() ) ); 00425 } 00426 break; 00427 default: 00428 break; 00429 } 00430 00431 if ( event->job() !=0 ) 00432 { 00433 event->job()->processEvent (event); 00434 } 00435 } else { 00436 debug ( 0, "Weaver::event: Strange: received unknown user event.\n" ); 00437 } 00438 return true; 00439 } else { 00440 // others - please make sure we are a QObject! 00441 return QObject::event ( e ); 00442 } 00443 } 00444 00445 void Weaver::post (Event::Action a, Thread* t, Job* j) 00446 { 00447 Event *e = new Event ( a, t, j); 00448 QApplication::postEvent (this, e); 00449 } 00450 00451 bool Weaver::isEmpty() const 00452 { 00453 QMutexLocker l (m_mutex); 00454 return m_assignments.count()==0; 00455 } 00456 00457 Job* Weaver::applyForWork(Thread *th, Job* previous) 00458 { 00459 Job *rc = 0; 00460 bool lastjob = false; 00461 bool suspended = false; 00462 00463 while (true) 00464 { 00465 lock(); 00466 00467 if (previous != 0) 00468 { // cleanup and send events: 00469 --m_active; 00470 00471 debug ( 3, "Weaver::applyForWork: job done, %i jobs left, " 00472 "%i active jobs left.\n", 00473 queueLength(), m_active ); 00474 00475 if ( m_active == 0 && isEmpty() ) 00476 { 00477 lastjob = true; 00478 m_running = false; 00479 post (Event::Finished); 00480 debug ( 3, "Weaver::applyForWork: last job.\n" ); 00481 } 00482 00483 if (m_active == 0 && m_suspend == true) 00484 { 00485 suspended = true; 00486 post (Event::Suspended); 00487 debug ( 2, "Weaver::applyForWork: queueing suspended.\n" ); 00488 } 00489 00490 m_jobFinished.wakeOne(); 00491 } 00492 00493 previous = 0; 00494 00495 if (m_shuttingDown == true) 00496 { 00497 unlock(); 00498 00499 return 0; 00500 } else { 00501 if ( !isEmpty() && m_suspend == false ) 00502 { 00503 rc = m_assignments.getFirst(); 00504 m_assignments.removeFirst (); 00505 ++m_active; 00506 00507 debug ( 3, "Weaver::applyForWork: job assigned, " 00508 "%i jobs in queue (%i active).\n", 00509 m_assignments.count(), m_active ); 00510 unlock(); 00511 00512 post (Event::ThreadBusy, th); 00513 00514 return rc; 00515 } else { 00516 unlock(); 00517 00518 post (Event::ThreadSuspended, th); 00519 m_jobAvailable.wait(); 00520 } 00521 } 00522 } 00523 } 00524 00525 int Weaver::queueLength() 00526 { 00527 QMutexLocker l (m_mutex); 00528 return m_assignments.count(); 00529 } 00530 00531 bool Weaver::isIdle () const 00532 { 00533 QMutexLocker l (m_mutex); 00534 return isEmpty() && m_active == 0; 00535 } 00536 00537 void Weaver::finish() 00538 { 00539 while ( !isIdle() ) 00540 { 00541 debug (2, "Weaver::finish: not done, waiting.\n" ); 00542 m_jobFinished.wait(); 00543 } 00544 debug (1, "Weaver::finish: done.\n\n\n" ); 00545 } 00546 00547 } 00548 } 00549 00550 #include "weaver.moc"
KDE Logo
This file is part of the documentation for libkdepim Library Version 3.2.2.
Documentation copyright © 1996-2004 the KDE developers.
Generated on Wed Jul 28 23:57:46 2004 by doxygen 1.3.7 written by Dimitri van Heesch, © 1997-2003