#include <Reactor.h>
Public Member Functions | |
Reactor () | |
Constructor. | |
~Reactor () | |
Destructor. | |
TimerId | registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>") |
Register Timer Event handler with Reactor. | |
bool | registerIOHandler (EventHandler *eh_, int fd_, EventType et_=RWE_EVENTS) |
Register I/O Event handler with Reactor. | |
bool | removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS) |
Remove Event handler from reactor for either all I/O events or timeout event or both. | |
bool | removeTimerHandler (TimerId id_) |
Remove Timer event from the queue. | |
bool | removeIOHandler (int fd_) |
Remove IO Event handler from reactor. | |
void | waitForEvents (void) |
Main waiting loop that blocks indefinitely processing events. | |
void | waitForEvents (TimeVal *tv_) |
Wait for events for time specified. | |
void | stopReactor (void) |
Stop Reactor's activity. | |
void | deactivate (void) |
Deactivate Reactor. | |
Private Member Functions | |
Reactor (const Reactor &) | |
Reactor & | operator= (const Reactor &) |
no cloning | |
bool | handleError (void) |
Handle error in select(2) loop appropriately. | |
bool | dispatch (int minimum_) |
Notify all EventHandlers registered on respecful events occured. | |
int | isAnyReady (void) |
Return number of file descriptors ready accross all sets. | |
bool | checkFDs (void) |
Check mask for bad file descriptors. | |
void | dispatchHandler (FdSet &mask_, EventHandler **fdSet_, EH_IO_Callback callback_) |
Call handler's callback and, if callback returns negative value, remove it from the Reactor. | |
void | calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_) |
Calculate closest timeout. | |
Private Attributes | |
int | m_noFiles |
Max number of open files per process. | |
int | m_maxfd |
Max file descriptor number plus 1. | |
bool | m_active |
Flag that indicates whether Reactor is active or had been stopped. | |
EventHandler ** | m_readSet |
Event handlers awaiting on READ_EVENT. | |
EventHandler ** | m_writeSet |
Event handlers awaiting on WRITE_EVENT. | |
EventHandler ** | m_exceptSet |
Event handlers awaiting on EXCEPT_EVENT. | |
MaskSet | m_waitSet |
Handlers to wait for event on. | |
MaskSet | m_readySet |
Handlers that are ready for processing. | |
TimerQueue | m_tqueue |
The queue of Timers. |
Definition at line 53 of file Reactor.h.
|
Constructor.
Definition at line 23 of file Reactor.cpp. References m_exceptSet, m_noFiles, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask. 00023 : 00024 m_noFiles (1024), m_maxfd (0), m_active (true), 00025 m_readSet ((EventHandler**) NULL), 00026 m_writeSet ((EventHandler**) NULL), 00027 m_exceptSet ((EventHandler**) NULL) 00028 { 00029 trace_with_mask("Reactor::Reactor",REACTTRACE); 00030 00031 struct rlimit rlim; 00032 rlim.rlim_max = 0; 00033 00034 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) { 00035 m_noFiles = rlim.rlim_cur; 00036 } 00037 00038 m_readSet = new EventHandler* [m_noFiles]; 00039 m_writeSet = new EventHandler* [m_noFiles]; 00040 m_exceptSet = new EventHandler* [m_noFiles]; 00041 00042 for (int i = 0; i < m_noFiles; i++) { 00043 m_readSet[i] = NULL; 00044 m_writeSet[i] = NULL; 00045 m_exceptSet[i] = NULL; 00046 } 00047 }
|
|
Destructor.
Definition at line 50 of file Reactor.cpp. References m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask. 00051 { 00052 trace_with_mask("Reactor::~Reactor",REACTTRACE); 00053 00054 delete [] m_readSet; 00055 delete [] m_writeSet; 00056 delete [] m_exceptSet; 00057 }
|
|
|
|
Calculate closest timeout. If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.
Definition at line 329 of file Reactor.cpp. References ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00330 { 00331 trace_with_mask("Reactor::calculateTimeout",REACTTRACE); 00332 00333 TimeVal now; 00334 TimeVal tv; 00335 00336 if (m_tqueue.isEmpty () ) { 00337 howlong_ = maxwait_; 00338 goto done; 00339 } 00340 now = TimeVal::gettimeofday (); 00341 tv = m_tqueue.top (); 00342 00343 if (tv < now) { 00344 /*--- 00345 It took too long to get here (fraction of a millisecond), 00346 and top timer had already expired. In this case, 00347 perform non-blocking select in order to drain the timer queue. 00348 ---*/ 00349 *howlong_ = 0; 00350 } 00351 else { 00352 DL((REACT,"--------- Timer Queue ----------\n")); 00353 m_tqueue.dump(); 00354 DL((REACT,"--------------------------------\n")); 00355 00356 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) { 00357 *howlong_ = tv - now; 00358 } 00359 else { 00360 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now; 00361 } 00362 } 00363 00364 done: 00365 if (howlong_ != NULL) { 00366 DL((REACT,"delay (%f)\n", double (*howlong_) )); 00367 } 00368 else { 00369 DL((REACT,"delay (forever)\n")); 00370 } 00371 }
|
|
Check mask for bad file descriptors.
Definition at line 230 of file Reactor.cpp. References ASSA::FdSet::clear(), DL, m_noFiles, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask. Referenced by handleError(). 00231 { 00232 trace_with_mask("Reactor::checkFDs",REACTTRACE); 00233 00234 bool num_removed = false; 00235 FdSet mask; 00236 timeval poll = { 0, 0 }; 00237 00238 for (int fd = 0; fd < m_noFiles; fd++) { 00239 if ( m_readSet[fd] != NULL ) { 00240 mask.setFd (fd); 00241 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) { 00242 removeIOHandler (fd); 00243 num_removed = true; 00244 DL((REACT,"Detected BAD FD: %d\n", fd )); 00245 } 00246 mask.clear (fd); 00247 } 00248 } 00249 return (num_removed); 00250 }
|
|
Deactivate Reactor. This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation. Definition at line 219 of file Reactor.h. References m_active. Referenced by ASSA::GenServer::handle_signal(), and ASSA::GenServer::stop_service(). 00219 { m_active = false; }
|
|
Notify all EventHandlers registered on respecful events occured.
Definition at line 494 of file Reactor.cpp. References dispatchHandler(), DL, EL, ASSA::ERROR, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00495 { 00496 /*--- 00497 Many UNIX systems will count a particular file descriptor in the 00498 ready_ only ONCE, even if it was flagged by ::select(2) in, say, 00499 both read and write masks. 00500 ---*/ 00501 trace_with_mask("Reactor::dispatch", REACTTRACE); 00502 00503 m_tqueue.expire (TimeVal::gettimeofday ()); 00504 00505 if ( ready_ < 0 ) { 00506 EL((ERROR,"::select(3) error\n")); 00507 return (false); 00508 } 00509 if ( ready_ == 0 ) { 00510 return (true); 00511 } 00512 DL((REACT,"Dispatching %d FDs\n",ready_)); 00513 00514 /*--- Writes first ---*/ 00515 dispatchHandler (m_readySet.m_wset, 00516 m_writeSet, 00517 &EventHandler::handle_write); 00518 00519 /*--- Exceptions next ---*/ 00520 dispatchHandler (m_readySet.m_eset, 00521 m_exceptSet, 00522 &EventHandler::handle_except); 00523 00524 /*--- Finally, the Reads ---*/ 00525 dispatchHandler (m_readySet.m_rset, 00526 m_readSet, 00527 &EventHandler::handle_read); 00528 return (true); 00529 }
|
|
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
Definition at line 456 of file Reactor.cpp. References ASSA::FdSet::clear(), DL, ASSA::FdSet::isSet(), m_active, m_maxfd, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask. Referenced by dispatch(). 00457 { 00458 trace_with_mask("Reactor::dispatchHandler",REACTTRACE); 00459 00460 register int fd; 00461 register int ret; 00462 00463 /*--- 00464 This spot needs re-thinking. When you have several high data-rate 00465 connections sending data at the same time, the one that had 00466 connected first would get lower FD number and would get data 00467 transfer preference over everybody else who has connected later on. 00468 ---*/ 00469 00470 for (fd = 0; m_active && fd < m_maxfd; fd++) { 00471 if (mask_.isSet (fd) && fdSet_[fd] != NULL) { 00472 00473 DL((REACT,"Data detected on connection %s (FD=%d)\n", 00474 fdSet_[fd]->get_id ().c_str (), fd)); 00475 00476 if ((ret = (fdSet_[fd]->*callback_) (fd)) == -1) { 00477 removeIOHandler (fd); 00478 } 00479 else if (ret > 0) { 00480 DL((REACT,"More data (%d bytes) are pending on FD=%d\n", 00481 ret,fd)); 00482 //return; <-- would starve other connections 00483 } 00484 else { 00485 DL((REACT,"All data are consumed from FD=%d\n", fd)); 00486 mask_.clear (fd); 00487 } 00488 } 00489 } 00490 }
|
|
Handle error in select(2) loop appropriately.
Definition at line 254 of file Reactor.cpp. References checkFDs(), DL, EL, ASSA::ERROR, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00255 { 00256 trace_with_mask("Reactor::handleError",REACTTRACE); 00257 00258 /*--- If commanded to stop, do so --*/ 00259 if ( !m_active ) { 00260 DL((REACT,"Received cmd to stop Reactor\n")); 00261 return (false); 00262 } 00263 00264 /*--- 00265 TODO: If select(2) returns before time expires, with 00266 a descriptor ready or with EINTR, timeval is not 00267 going to be updated with number of seconds remaining. 00268 This is true for all systems except Linux, which will 00269 do so. Therefore, to restart correctly in case of 00270 EINTR, we ought to take time measurement before and 00271 after select, and try to select() for remaining time. 00272 00273 For now, we restart with the initial timing value. 00274 ---*/ 00275 /*--- 00276 BSD kernel never restarts select(2). SVR4 will restart if 00277 the SA_RESTART flag is specified when the signal handler 00278 for the signal delivered is installed. This means taht for 00279 portability, we must handle signal interrupts. 00280 ---*/ 00281 00282 if ( errno == EINTR ) { 00283 EL((REACT,"EINTR: interrupted select(2)\n")); 00284 /* 00285 If I was sitting in select(2) and received SIGTERM, 00286 the signal handler would have set m_active to 'false', 00287 and this function would have returned 'false' as above. 00288 For any other non-critical signals (USR1,...), 00289 we retry select. 00290 */ 00291 return (true); 00292 } 00293 /* 00294 EBADF - bad file number. One of the file descriptors does 00295 not reference an open file to open(), close(), ioctl(). 00296 This can happen if user closed fd and forgot to remove 00297 handler from Reactor. 00298 */ 00299 if ( errno == EBADF ) { 00300 DL((REACT,"EBADF: bad file descriptor\n")); 00301 return (checkFDs ()); 00302 } 00303 /* 00304 Any other error from select 00305 */ 00306 EL((ERROR,"select(3) error\n")); 00307 return (false); 00308 }
|
|
Return number of file descriptors ready accross all sets.
Definition at line 312 of file Reactor.cpp. References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask. Referenced by waitForEvents(). 00313 { 00314 trace_with_mask("Reactor::isAnyReady",REACTTRACE); 00315 00316 int n = m_readySet.m_rset.numSet () + 00317 m_readySet.m_wset.numSet () + 00318 m_readySet.m_eset.numSet (); 00319 00320 if ( n > 0 ) { 00321 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n)); 00322 m_readySet.dump (); 00323 } 00324 return (n); 00325 }
|
|
no cloning
|
|
Register I/O Event handler with Reactor. Reactor will dispatch appropriate callback when event of EventType is received.
Definition at line 86 of file Reactor.cpp. References Assure_return, DL, ASSA::ERROR, ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), m_readSet, ASSA::MaskSet::m_rset, m_waitSet, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), and ASSA::RemoteLogger::log_open(). 00087 { 00088 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE); 00089 00090 std::ostringstream msg; 00091 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_)); 00092 00093 if ( isReadEvent (et_) ) { 00094 00095 if ( !m_waitSet.m_rset.setFd (fd_) ) { 00096 DL((ERROR,"readset: fd %d out of range\n", fd_)); 00097 return (false); 00098 } 00099 m_readSet[fd_] = eh_; 00100 msg << "READ_EVENT"; 00101 } 00102 00103 if ( isWriteEvent (et_) ) { 00104 00105 if ( !m_waitSet.m_wset.setFd (fd_) ) { 00106 DL((ERROR,"writeset: fd %d out of range\n", fd_)); 00107 return (false); 00108 } 00109 m_writeSet[fd_] = eh_; 00110 msg << " WRITE_EVENT"; 00111 } 00112 if ( isExceptEvent (et_) ) { 00113 if ( !m_waitSet.m_eset.setFd (fd_) ) { 00114 DL((ERROR,"exceptset: fd %d out of range\n", fd_)); 00115 return (false); 00116 } 00117 m_exceptSet[fd_] = eh_; 00118 msg << " EXCEPT_EVENT"; 00119 } 00120 msg << std::ends; 00121 DL((REACT,"Registered EventHandler 0x%x FD (%d) for event(s) %s\n", 00122 (u_long)eh_, fd_, msg.str ().c_str () )); 00123 00124 if ( m_maxfd < fd_+1 ) { 00125 m_maxfd = fd_+1; 00126 DL((REACT,"maxfd+1 adjusted to %d\n",m_maxfd)); 00127 } 00128 DL((REACT,"Modified waitSet:\n")); 00129 m_waitSet.dump (); 00130 00131 return (true); 00132 }
|
|
Register Timer Event handler with Reactor. Reactor will dispatch appropriate callback when event of EventType is received.
Definition at line 61 of file Reactor.cpp. References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(). 00064 { 00065 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE); 00066 Assure_return (eh_); 00067 00068 TimeVal now (TimeVal::gettimeofday()); 00069 TimeVal t (now + timeout_); 00070 00071 DL((REACT,"TIMEOUT_EVENT: (%d,%d)\n", timeout_.sec(),timeout_.msec())); 00072 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() )); 00073 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() )); 00074 00075 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_); 00076 00077 DL((REACT,"---Modified Timer Queue----\n")); 00078 m_tqueue.dump(); 00079 DL((REACT,"---------------------------\n")); 00080 00081 return (tid); 00082 }
|
|
Remove Event handler from reactor for either all I/O events or timeout event or both. If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.
Definition at line 154 of file Reactor.cpp. References ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), m_exceptSet, m_maxfd, m_readSet, m_tqueue, m_writeSet, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), removeIOHandler(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), and ASSA::RemoteLogger::log_close(). 00155 { 00156 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE); 00157 00158 if (eh_ == NULL) { 00159 return false; 00160 } 00161 bool ret = false; 00162 register int fd; 00163 00164 if (isTimeoutEvent (et_)) { 00165 ret = m_tqueue.remove (eh_); 00166 } 00167 00168 if (isReadEvent (et_) || isWriteEvent (et_) || isExceptEvent (et_)) { 00169 for (fd = 0; fd < m_maxfd; fd++) { 00170 if (m_readSet[fd] == eh_ || m_writeSet[fd] == eh_ || 00171 m_exceptSet[fd] == eh_) { 00172 ret = removeIOHandler (fd); 00173 } 00174 } 00175 } 00176 return (ret); 00177 }
|
|
Remove IO Event handler from reactor. This will remove handler from receiving all I/O events.
Definition at line 181 of file Reactor.cpp. References Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd, m_noFiles, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask. Referenced by checkFDs(), dispatchHandler(), removeHandler(), and stopReactor(). 00182 { 00183 trace_with_mask("Reactor::removeIOHandler",REACTTRACE); 00184 00185 Assure_return (fd_ >= 0 && fd_ < m_noFiles); 00186 00187 DL((REACT,"Removing Handler fd = %d\n",fd_)); 00188 00189 EventHandler* eh = NULL; 00190 00191 if ( m_readSet[fd_] ) eh = m_readSet[fd_]; 00192 else if ( m_writeSet[fd_] ) eh = m_writeSet[fd_]; 00193 else if ( m_exceptSet[fd_] ) eh = m_exceptSet[fd_]; 00194 00195 if (eh) { 00196 DL((REACT,"Found EvtHandler 0x%x\n",(u_long)eh)); 00197 eh->handle_close (fd_); 00198 } 00199 00200 m_readSet[fd_] = NULL; 00201 m_writeSet[fd_] = NULL; 00202 m_exceptSet[fd_] = NULL; 00203 00204 m_waitSet.m_rset.clear (fd_); 00205 m_waitSet.m_wset.clear (fd_); 00206 m_waitSet.m_eset.clear (fd_); 00207 00208 m_readySet.m_rset.clear (fd_); 00209 m_readySet.m_wset.clear (fd_); 00210 m_readySet.m_eset.clear (fd_); 00211 00212 if ( m_maxfd == fd_+1 ) { 00213 while ( m_maxfd > 0 && 00214 m_readSet [m_maxfd-1] == NULL && 00215 m_writeSet [m_maxfd-1] == NULL && 00216 m_exceptSet[m_maxfd-1] == NULL ) 00217 { 00218 m_maxfd--; 00219 } 00220 } 00221 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd)); 00222 DL((REACT,"Modifies waitSet:\n")); 00223 m_waitSet.dump (); 00224 00225 return (true); 00226 }
|
|
Remove Timer event from the queue. This removes particular event.
Definition at line 136 of file Reactor.cpp. References DL, ASSA::TimerQueue::dump(), EL, ASSA::ERROR, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(). 00137 { 00138 trace_with_mask("Reactor::removeTimer",REACTTRACE); 00139 bool ret; 00140 00141 if ((ret = m_tqueue.remove (tid_))) { 00142 DL((REACT,"---Modified Timer Queue----\n")); 00143 m_tqueue.dump(); 00144 DL((REACT,"---------------------------\n")); 00145 } 00146 else { 00147 EL((ERROR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ )); 00148 } 00149 return (ret); 00150 }
|
|
Stop Reactor's activity. This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls. Definition at line 533 of file Reactor.cpp. References m_active, m_maxfd, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask. 00534 { 00535 trace_with_mask("Reactor::stopReactor", REACTTRACE); 00536 00537 m_active = false; 00538 register int i; 00539 00540 for (i = 0; i < m_maxfd; i++) { 00541 removeIOHandler (i); 00542 } 00543 }
|
|
Wait for events for time specified. Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.
Definition at line 400 of file Reactor.cpp. References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), and trace_with_mask. 00401 { 00402 trace_with_mask("Reactor::waitForEvents",REACTTRACE); 00403 00404 TimerCountdown traceTime (tv_); 00405 DL((REACT,"======================================\n")); 00406 00407 /*--- Expire all stale Timers ---*/ 00408 m_tqueue.expire (TimeVal::gettimeofday ()); 00409 00410 /* Test to see if Reactor has been deactivated as a result 00411 * of processing done by any TimerHandlers. 00412 */ 00413 if (!m_active) { 00414 return; 00415 } 00416 00417 int nReady; 00418 TimeVal delay; 00419 TimeVal* dlp = &delay; 00420 00421 /*--- 00422 In case if not all data are processed by the EventHandler, 00423 and EventHandler stated so in its callback's return value 00424 to dispatcher (), it will be called again. This way 00425 underlying file/socket stream can efficiently utilize its 00426 buffering mechaninsm. 00427 ---*/ 00428 if ((nReady = isAnyReady ())) { 00429 DL((REACT,"isAnyReady returned: %d\n",nReady)); 00430 dispatch (nReady); 00431 return; 00432 } 00433 00434 DL((REACT,"=== m_waitSet ===\n")); 00435 m_waitSet.dump (); 00436 00437 do { 00438 m_readySet.reset (); 00439 m_readySet = m_waitSet; 00440 calculateTimeout (dlp, tv_); 00441 00442 nReady = ::select (m_maxfd, 00443 &m_readySet.m_rset, 00444 &m_readySet.m_wset, 00445 &m_readySet.m_eset, 00446 dlp); 00447 DL((REACT,"::select() returned: %d\n",nReady)); 00448 } 00449 while (nReady < 0 && handleError ()); 00450 00451 dispatch (nReady); 00452 }
|
|
Main waiting loop that blocks indefinitely processing events.
Definition at line 375 of file Reactor.cpp. References m_active. Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(). 00376 { 00377 while ( m_active ) { 00378 waitForEvents ((TimeVal*) NULL); 00379 } 00380 }
|
|
Flag that indicates whether Reactor is active or had been stopped.
Definition at line 195 of file Reactor.h. Referenced by deactivate(), dispatchHandler(), handleError(), stopReactor(), and waitForEvents(). |
|
Event handlers awaiting on EXCEPT_EVENT.
Definition at line 204 of file Reactor.h. Referenced by dispatch(), Reactor(), removeHandler(), removeIOHandler(), and ~Reactor(). |
|
Max file descriptor number plus 1.
Definition at line 192 of file Reactor.h. Referenced by dispatchHandler(), removeHandler(), removeIOHandler(), stopReactor(), and waitForEvents(). |
|
Max number of open files per process. This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit. Definition at line 189 of file Reactor.h. Referenced by checkFDs(), Reactor(), and removeIOHandler(). |
|
Event handlers awaiting on READ_EVENT.
Definition at line 198 of file Reactor.h. Referenced by checkFDs(), dispatch(), Reactor(), registerIOHandler(), removeHandler(), removeIOHandler(), and ~Reactor(). |
|
Handlers that are ready for processing.
Definition at line 210 of file Reactor.h. Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents(). |
|
The queue of Timers.
Definition at line 213 of file Reactor.h. Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents(). |
|
Handlers to wait for event on.
Definition at line 207 of file Reactor.h. Referenced by registerIOHandler(), removeIOHandler(), and waitForEvents(). |
|
Event handlers awaiting on WRITE_EVENT.
Definition at line 201 of file Reactor.h. Referenced by dispatch(), Reactor(), removeHandler(), removeIOHandler(), and ~Reactor(). |