00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
extern "C" {
00017
#include <signal.h>
00018 }
00019
00020
#include <qevent.h>
00021
#include <qapplication.h>
00022
00023
#include "weaver.h"
00024
00025
namespace KPIM {
00026
namespace ThreadWeaver {
00027
00028
bool Debug =
true;
00029
int DebugLevel = 2;
00030
00031 Job::Job (QObject* parent,
const char* name)
00032 : QObject (parent, name),
00033 m_finished (false),
00034 m_mutex (new QMutex (true) ),
00035 m_thread (0)
00036 {
00037 }
00038
00039 Job::~Job()
00040 {
00041 }
00042
00043 void Job::lock()
00044 {
00045 m_mutex->lock();
00046 }
00047
00048 void Job::unlock()
00049 {
00050 m_mutex->unlock();
00051 }
00052
00053 void Job::execute(
Thread *th)
00054 {
00055 m_mutex->lock();
00056 m_thread = th;
00057 m_mutex->unlock();
00058
00059
run ();
00060
00061 m_mutex->lock();
00062
setFinished (
true);
00063 m_thread = 0;
00064 m_mutex->unlock();
00065 }
00066
00067 Thread *Job::thread ()
00068 {
00069 QMutexLocker l (m_mutex);
00070
return m_thread;
00071 }
00072
00073 bool Job::isFinished()
const
00074
{
00075 QMutexLocker l (m_mutex);
00076
return m_finished;
00077 }
00078
00079 void Job::setFinished(
bool status)
00080 {
00081 QMutexLocker l (m_mutex);
00082 m_finished = status;
00083 }
00084
00085 void Job::processEvent (
Event *e)
00086 {
00087
switch ( e->
action() )
00088 {
00089
case Event::JobStarted:
00090 emit (
started() );
00091
break;
00092
case Event::JobFinished:
00093 emit (
done() );
00094
break;
00095
case Event::JobSPR:
00096 emit (
SPR () );
00097 m_wc->wakeOne ();
00098
break;
00099
case Event::JobAPR:
00100 emit (
APR () );
00101
00102
break;
00103
default:
00104
break;
00105 }
00106 }
00107
00108 void Job::triggerSPR ()
00109 {
00110 m_mutex->lock ();
00111 m_wc =
new QWaitCondition;
00112 m_mutex->unlock ();
00113
00114
thread()->
post (KPIM::ThreadWeaver::Event::JobSPR,
this);
00115 m_wc->wait ();
00116
00117 m_mutex->lock ();
00118
delete m_wc;
00119 m_wc = 0;
00120 m_mutex->unlock ();
00121 }
00122
00123 void Job::triggerAPR ()
00124 {
00125 m_mutex->lock ();
00126 m_wc =
new QWaitCondition;
00127 m_mutex->unlock ();
00128
00129
thread()->
post (KPIM::ThreadWeaver::Event::JobAPR,
this);
00130 m_wc->wait ();
00131 }
00132
00133 void Job::wakeAPR ()
00134 {
00135 QMutexLocker l(m_mutex);
00136
if ( m_wc!=0 )
00137 {
00138 m_wc->wakeOne ();
00139
delete m_wc;
00140 m_wc = 0;
00141 }
00142 }
00143
00144
const int Event::Type = QEvent::User + 1000;
00145
00146 Event::Event ( Action action,
Thread *thread,
Job *job)
00147 : QCustomEvent ( type () ),
00148 m_action (action),
00149 m_thread (thread),
00150 m_job (job)
00151 {
00152 }
00153
00154 const int Event::type ()
00155 {
00156
return Type;
00157 }
00158
00159 Thread* Event::thread ()
const
00160
{
00161
if ( m_thread != 0)
00162 {
00163
return m_thread;
00164 }
else {
00165
return 0;
00166 }
00167 }
00168
00169 Job* Event::job ()
const
00170
{
00171
return m_job;
00172 }
00173
00174 Event::Action Event::action ()
const
00175
{
00176
return m_action;
00177 }
00178
00179
unsigned int Thread::sm_Id;
00180
00181 Thread::Thread (
Weaver *parent)
00182 : QThread (),
00183 m_parent ( parent ),
00184 m_id ( makeId() )
00185 {
00186 }
00187
00188 Thread::~Thread()
00189 {
00190 }
00191
00192
unsigned int Thread::makeId()
00193 {
00194
static QMutex mutex;
00195 QMutexLocker l (&mutex);
00196
00197
return ++sm_Id;
00198 }
00199
00200 const unsigned int Thread::id()
const
00201
{
00202
return m_id;
00203 }
00204
00205 void Thread::run()
00206 {
00207
Job *job = 0;
00208
00209
post ( Event::ThreadStarted );
00210
00211
while (
true)
00212 {
00213 debug ( 3,
"Thread::run [%u]: trying to execute the next job.\n",
id() );
00214
00215 job = m_parent->
applyForWork (
this, job );
00216
00217
if (job == 0)
00218 {
00219
break;
00220 }
else {
00221
post ( Event::JobStarted, job );
00222 job->
execute (
this);
00223
post ( Event::JobFinished, job );
00224 }
00225 }
00226
00227
post ( Event::ThreadExiting );
00228 }
00229
00230 void Thread::post (Event::Action a,
Job *j)
00231 {
00232 m_parent->
post ( a,
this, j);
00233 }
00234
00235
void Thread::msleep(
unsigned long msec)
00236 {
00237 QThread::msleep(msec);
00238 }
00239
00240 Weaver::Weaver(QObject* parent,
const char* name,
00241
int inventoryMin,
int inventoryMax)
00242 : QObject(parent, name),
00243 m_active(0),
00244 m_inventoryMin(inventoryMin),
00245 m_inventoryMax(inventoryMax),
00246 m_shuttingDown(false),
00247 m_running (false),
00248 m_suspend (false),
00249 m_mutex ( new QMutex(true) )
00250 {
00251 lock();
00252
00253
for (
int count = 0; count < m_inventoryMin; ++count)
00254 {
00255 Thread *th =
new Thread(
this);
00256 m_inventory.append(th);
00257
00258 th->start();
00259
00260 emit (threadCreated (th) );
00261 }
00262
00263 unlock();
00264 }
00265
00266 Weaver::~Weaver()
00267 {
00268 lock();
00269
00270 debug ( 1,
"Weaver dtor: destroying inventory.\n" );
00271
00272 m_shuttingDown =
true;
00273
00274 unlock();
00275
00276 m_jobAvailable.wakeAll();
00277
00278
00279
00280
00281
00282
00283
00284
00285
for (
Thread *th = m_inventory.first(); th; th = m_inventory.next() )
00286 {
00287
if ( !th->finished() )
00288 {
00289 m_jobAvailable.wakeAll();
00290 th->wait();
00291 }
00292
00293 emit (threadDestroyed (th) );
00294
delete th;
00295
00296 }
00297
00298 m_inventory.clear();
00299
00300
delete m_mutex;
00301
00302 debug ( 1,
"Weaver dtor: done\n" );
00303
00304 }
00305
00306 void Weaver::lock()
00307 {
00308 debug ( 3 ,
"Weaver::lock: lock (mutex is %s).\n",
00309 ( m_mutex->locked() ?
"locked" :
"not locked" ) );
00310 m_mutex->lock();
00311 }
00312
00313 void Weaver::unlock()
00314 {
00315 m_mutex->unlock();
00316
00317 debug ( 3 ,
"Weaver::unlock: unlock (mutex is %s).\n",
00318 ( m_mutex->locked() ?
"locked" :
"not locked" ) );
00319 }
00320
00321 int Weaver::threads ()
const
00322
{
00323 QMutexLocker l (m_mutex);
00324
return m_inventory.count ();
00325 }
00326
00327 void Weaver::enqueue(
Job* job)
00328 {
00329
lock();
00330
00331
m_assignments.append(job);
00332
m_running =
true;
00333
00334
unlock();
00335
00336
assignJobs();
00337 }
00338
00339 void Weaver::enqueue (QPtrList <Job> jobs)
00340 {
00341
lock();
00342
00343
for (
Job * job = jobs.first(); job; job = jobs.next() )
00344 {
00345
m_assignments.append (job);
00346 }
00347
00348
unlock();
00349
00350
assignJobs();
00351 }
00352
00353 bool Weaver::dequeue (
Job* job )
00354 {
00355 QMutexLocker l (m_mutex);
00356
return m_assignments.remove (job);
00357 }
00358
00359 void Weaver::dequeue ()
00360 {
00361 QMutexLocker l (m_mutex);
00362
m_assignments.clear();
00363 }
00364
00365 void Weaver::suspend (
bool state)
00366 {
00367
lock();
00368
00369
if (state)
00370 {
00371
00372
m_suspend =
true;
00373
if (
m_active == 0 &&
isEmpty() )
00374 {
00375
post (Event::Suspended);
00376 }
00377 }
else {
00378
m_suspend =
false;
00379
00380
assignJobs ();
00381 debug (2,
"Weaver::suspend: queueing resumed.\n" );
00382 }
00383
00384
unlock();
00385 }
00386
00387 void Weaver::assignJobs()
00388 {
00389
m_jobAvailable.wakeAll();
00390 }
00391
00392 bool Weaver::event (QEvent *e )
00393 {
00394
if ( e->type() >= QEvent::User )
00395 {
00396
00397
if ( e->type() == Event::type() )
00398 {
00399
Event *
event = (
Event*) e;
00400
00401
switch (event->
action() )
00402 {
00403
case Event::JobFinished:
00404
if ( event->
job() !=0 )
00405 {
00406 emit (
jobDone (event->
job() ) );
00407 }
00408
break;
00409
case Event::Finished:
00410 emit (
finished() );
00411
break;
00412
case Event::Suspended:
00413 emit (
suspended() );
00414
break;
00415
case Event::ThreadSuspended:
00416
if (!
m_shuttingDown )
00417 {
00418 emit (threadSuspended ( event->
thread() ) );
00419 }
00420
break;
00421
case Event::ThreadBusy:
00422
if (!
m_shuttingDown )
00423 {
00424 emit (threadBusy (event->
thread() ) );
00425 }
00426
break;
00427
default:
00428
break;
00429 }
00430
00431
if ( event->
job() !=0 )
00432 {
00433 event->
job()->
processEvent (event);
00434 }
00435 }
else {
00436 debug ( 0,
"Weaver::event: Strange: received unknown user event.\n" );
00437 }
00438
return true;
00439 }
else {
00440
00441
return QObject::event ( e );
00442 }
00443 }
00444
00445 void Weaver::post (Event::Action a,
Thread* t,
Job* j)
00446 {
00447
Event *e =
new Event ( a, t, j);
00448 QApplication::postEvent (
this, e);
00449 }
00450
00451 bool Weaver::isEmpty()
const
00452
{
00453 QMutexLocker l (m_mutex);
00454
return m_assignments.count()==0;
00455 }
00456
00457 Job* Weaver::applyForWork(
Thread *th,
Job* previous)
00458 {
00459
Job *rc = 0;
00460
bool lastjob =
false;
00461
bool suspended =
false;
00462
00463
while (
true)
00464 {
00465
lock();
00466
00467
if (previous != 0)
00468 {
00469 --
m_active;
00470
00471 debug ( 3,
"Weaver::applyForWork: job done, %i jobs left, "
00472
"%i active jobs left.\n",
00473
queueLength(),
m_active );
00474
00475
if (
m_active == 0 &&
isEmpty() )
00476 {
00477 lastjob =
true;
00478
m_running =
false;
00479
post (Event::Finished);
00480 debug ( 3,
"Weaver::applyForWork: last job.\n" );
00481 }
00482
00483
if (
m_active == 0 &&
m_suspend ==
true)
00484 {
00485 suspended =
true;
00486
post (Event::Suspended);
00487 debug ( 2,
"Weaver::applyForWork: queueing suspended.\n" );
00488 }
00489
00490
m_jobFinished.wakeOne();
00491 }
00492
00493 previous = 0;
00494
00495
if (
m_shuttingDown ==
true)
00496 {
00497
unlock();
00498
00499
return 0;
00500 }
else {
00501
if ( !
isEmpty() &&
m_suspend ==
false )
00502 {
00503 rc =
m_assignments.getFirst();
00504
m_assignments.removeFirst ();
00505 ++
m_active;
00506
00507 debug ( 3,
"Weaver::applyForWork: job assigned, "
00508
"%i jobs in queue (%i active).\n",
00509
m_assignments.count(),
m_active );
00510
unlock();
00511
00512
post (Event::ThreadBusy, th);
00513
00514
return rc;
00515 }
else {
00516
unlock();
00517
00518
post (Event::ThreadSuspended, th);
00519
m_jobAvailable.wait();
00520 }
00521 }
00522 }
00523 }
00524
00525 int Weaver::queueLength()
00526 {
00527 QMutexLocker l (m_mutex);
00528
return m_assignments.count();
00529 }
00530
00531 bool Weaver::isIdle ()
const
00532
{
00533 QMutexLocker l (m_mutex);
00534
return isEmpty() &&
m_active == 0;
00535 }
00536
00537 void Weaver::finish()
00538 {
00539
while ( !
isIdle() )
00540 {
00541 debug (2,
"Weaver::finish: not done, waiting.\n" );
00542
m_jobFinished.wait();
00543 }
00544 debug (1,
"Weaver::finish: done.\n\n\n" );
00545 }
00546
00547 }
00548 }
00549
00550
#include "weaver.moc"