Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

wvstream.cc

Go to the documentation of this file.
00001 /* 00002 * Worldvisions Weaver Software: 00003 * Copyright (C) 1997-2002 Net Integration Technologies, Inc. 00004 * 00005 * Unified support for streams, that is, sequences of bytes that may or 00006 * may not be ready for read/write at any given time. 00007 * 00008 * We provide typical read and write routines, as well as a select() function 00009 * for each stream. 00010 */ 00011 #include "wvstream.h" 00012 #include "wvtask.h" 00013 #include "wvtimeutils.h" 00014 #include <time.h> 00015 #include <sys/types.h> 00016 #include <assert.h> 00017 00018 #ifdef _WIN32 00019 #define ENOBUFS WSAENOBUFS 00020 #undef errno 00021 #define errno GetLastError() 00022 #else 00023 #include <errno.h> 00024 #endif 00025 00026 // enable this to add some read/write trace messages (this can be VERY 00027 // verbose) 00028 #if 0 00029 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr); 00030 #else 00031 #ifndef _MSC_VER 00032 # define TRACE(x, y...) 00033 #else 00034 # define TRACE 00035 #endif 00036 #endif 00037 00038 WvStream *WvStream::globalstream = NULL; 00039 00040 XUUID_MAP_BEGIN(IWvStream) 00041 XUUID_MAP_ENTRY(IObject) 00042 XUUID_MAP_ENTRY(IWvStream) 00043 XUUID_MAP_END 00044 00045 WvStream::WvStream() 00046 { 00047 TRACE("Creating wvstream %p\n", this); 00048 00049 #ifdef _WIN32 00050 WSAData wsaData; 00051 int result = WSAStartup(MAKEWORD(2,0), &wsaData); 00052 assert(result == 0); 00053 #endif 00054 wvstream_execute_called = false; 00055 userdata = closecb_data = NULL; 00056 errnum = 0; 00057 max_outbuf_size = 0; 00058 outbuf_delayed_flush = false; 00059 want_to_flush = true; 00060 is_flushing = false; 00061 is_auto_flush = true; 00062 alarm_was_ticking = false; 00063 force.readable = true; 00064 force.writable = force.isexception = false; 00065 read_requires_writable = write_requires_readable = NULL; 00066 running_callback = false; 00067 want_nowrite = false; 00068 queue_min = 0; 00069 autoclose_time = 0; 00070 alarm_time = wvtime_zero; 00071 last_alarm_check = wvtime_zero; 00072 taskman = 0; 00073 00074 // magic multitasking support 00075 uses_continue_select = false; 00076 personal_stack_size = 65536; 00077 task = NULL; 00078 } 00079 00080 00081 // FIXME: interfaces (IWvStream) shouldn't have implementations! 00082 IWvStream::IWvStream() 00083 { 00084 } 00085 00086 00087 IWvStream::~IWvStream() 00088 { 00089 } 00090 00091 00092 WvStream::~WvStream() 00093 { 00094 TRACE("destroying %p\n", this); 00095 if (running_callback) 00096 { 00097 // user should have called terminate_continue_select()... 00098 TRACE("eek! destroying while running_callback!\n"); 00099 assert(!running_callback); 00100 } 00101 close(); 00102 00103 if (task) 00104 { 00105 while (task->isrunning()) 00106 taskman->run(*task); 00107 task->recycle(); 00108 task = NULL; 00109 } 00110 TRACE("done destroying %p\n", this); 00111 if (taskman) 00112 taskman->unlink(); 00113 } 00114 00115 00116 void WvStream::close() 00117 { 00118 flush(2000); // fixme: should not hardcode this stuff 00119 if (!! closecb_func) 00120 { 00121 WvStreamCallback cb = closecb_func; 00122 closecb_func = 0; // ensure callback is only called once 00123 cb(*this, closecb_data); 00124 } 00125 } 00126 00127 00128 void WvStream::autoforward(WvStream &s) 00129 { 00130 setcallback(autoforward_callback, &s); 00131 read_requires_writable = &s; 00132 } 00133 00134 00135 void WvStream::noautoforward() 00136 { 00137 setcallback(0, NULL); 00138 read_requires_writable = NULL; 00139 } 00140 00141 00142 void WvStream::autoforward_callback(WvStream &s, void *userdata) 00143 { 00144 WvStream &s2 = *(WvStream *)userdata; 00145 char buf[1024]; 00146 size_t len; 00147 00148 len = s.read(buf, sizeof(buf)); 00149 s2.write(buf, len); 00150 } 00151 00152 00153 // this is run in the subtask owned by 'stream', if any; NOT necessarily 00154 // the task that runs WvStream::callback(). That's why this needs to be 00155 // a separate function. 00156 void WvStream::_callback(void *stream) 00157 { 00158 WvStream *s = (WvStream *)stream; 00159 00160 s->running_callback = true; 00161 00162 s->wvstream_execute_called = false; 00163 s->execute(); 00164 if (!! s->callfunc) 00165 s->callfunc(*s, s->userdata); 00166 00167 // if this assertion fails, a derived class's virtual execute() function 00168 // didn't call its parent's execute() function, and we didn't make it 00169 // all the way back up to WvStream::execute(). This doesn't always 00170 // matter right now, but it could lead to obscure bugs later, so we'll 00171 // enforce it. 00172 assert(s->wvstream_execute_called); 00173 00174 s->running_callback = false; 00175 } 00176 00177 00178 void WvStream::callback() 00179 { 00180 TRACE("(?)"); 00181 00182 // callback is already running -- don't try to start it again, or we 00183 // could end up in an infinite loop! 00184 if (running_callback) 00185 return; 00186 00187 // if the alarm has gone off and we're calling callback... good! 00188 if (alarm_remaining() == 0) 00189 { 00190 alarm_time = wvtime_zero; 00191 alarm_was_ticking = true; 00192 } 00193 else 00194 alarm_was_ticking = false; 00195 00196 assert(!uses_continue_select || personal_stack_size >= 1024); 00197 00198 // if (1) 00199 if (uses_continue_select && personal_stack_size >= 1024) 00200 { 00201 if (!taskman) 00202 taskman = WvTaskMan::get(); 00203 00204 if (!task) 00205 { 00206 TRACE("(!)"); 00207 task = taskman->start("streamexec", _callback, this, 00208 personal_stack_size); 00209 } 00210 else if (!task->isrunning()) 00211 { 00212 TRACE("(.)"); 00213 task->start("streamexec2", _callback, this); 00214 } 00215 00216 // This loop is much more subtle than it looks. 00217 // By implementing it this way, we provide something that works 00218 // like a typical callback() stack: that is, a child callback 00219 // must return before the parent's callback does. 00220 // 00221 // What _actually_ happens is a child will call yield() upon returning 00222 // from its callback function, which exits the taskman and returns to 00223 // the top level. The top level, though, is running this loop, which 00224 // re-executes taskman->run() since its child (which is eventually 00225 // the parent of the child that called yield()) hasn't finished yet. 00226 // We build our way all the way back up to the first-level parent of 00227 // the child calling yield(), which now notices its child has finished 00228 // and continues on in its execute() function. 00229 // 00230 // continue_select() will set running_callback to false, even though 00231 // it doesn't actually return from the callback function. That 00232 // causes this loop to terminate, and the callback will get resumed 00233 // later when select() returns true. 00234 do 00235 { 00236 taskman->run(*task); 00237 } while (task && task->isrunning() && running_callback); 00238 } 00239 else 00240 _callback(this); 00241 00242 /* DON'T PUT ANY CODE HERE! 00243 * 00244 * WvStreamList calls its child streams above via taskman->run(). 00245 * If a child is deleted, it waits for its callback task to finish the 00246 * current iteration, then recycles its WvTask object and allows the 00247 * "delete" call to finish, so the object no longer exists. 00248 * 00249 * The catch: the callback() function is actually running in 00250 * the WvStreamList's task (if any), which hasn't had a chance to 00251 * exit yet. Next time we jump into the WvStreamList, we will arrive 00252 * immediately after the taskman->run() line, ie. right here in the 00253 * code. In that case, the 'this' pointer could be pointing at an 00254 * invalid object, so we should just exit before we do something stupid. 00255 */ 00256 } 00257 00258 00259 void WvStream::execute() 00260 { 00261 // do nothing by default, but notice that we were here. 00262 wvstream_execute_called = true; 00263 } 00264 00265 00266 bool WvStream::isok() const 00267 { 00268 return WvError::isok(); 00269 } 00270 00271 00272 void WvStream::seterr(int _errnum) 00273 { 00274 if (!errnum) 00275 { 00276 WvError::seterr(_errnum); 00277 close(); 00278 } 00279 } 00280 00281 00282 size_t WvStream::read(WvBuf &outbuf, size_t count) 00283 { 00284 // for now, just wrap the older read function 00285 size_t free = outbuf.free(); 00286 if (count > free) 00287 count = free; 00288 00289 WvDynBuf tmp; 00290 unsigned char *buf = tmp.alloc(count); 00291 size_t len = read(buf, count); 00292 tmp.unalloc(count - len); 00293 outbuf.merge(tmp); 00294 return len; 00295 } 00296 00297 00298 size_t WvStream::continue_read(time_t wait_msec, WvBuf &outbuf, size_t count) 00299 { 00300 // for now, just wrap the older read function 00301 size_t free = outbuf.free(); 00302 if (count > free) 00303 count = free; 00304 unsigned char *buf = outbuf.alloc(count); 00305 00306 // call the non-WvBuf continue_read 00307 size_t len = continue_read(wait_msec, buf, count); 00308 00309 outbuf.unalloc(count - len); 00310 return len; 00311 } 00312 00313 00314 size_t WvStream::write(WvBuf &inbuf, size_t count) 00315 { 00316 // for now, just wrap the older write function 00317 size_t avail = inbuf.used(); 00318 if (count > avail) 00319 count = avail; 00320 const unsigned char *buf = inbuf.get(count); 00321 size_t len = write(buf, count); 00322 inbuf.unget(count - len); 00323 return len; 00324 } 00325 00326 00327 size_t WvStream::read(void *buf, size_t count) 00328 { 00329 size_t bufu = inbuf.used(), i; 00330 unsigned char *newbuf; 00331 00332 bufu = inbuf.used(); 00333 if (bufu < queue_min) 00334 { 00335 newbuf = inbuf.alloc(queue_min - bufu); 00336 i = uread(newbuf, queue_min - bufu); 00337 inbuf.unalloc(queue_min - bufu - i); 00338 00339 bufu = inbuf.used(); 00340 } 00341 00342 if (bufu < queue_min) 00343 return 0; 00344 00345 // if buffer is empty, do a hard read 00346 if (!bufu) 00347 bufu = uread(buf, count); 00348 else 00349 { 00350 // otherwise just read from the buffer 00351 if (bufu > count) 00352 bufu = count; 00353 00354 memcpy(buf, inbuf.get(bufu), bufu); 00355 } 00356 00357 TRACE("read obj 0x%p, bytes %d/%d\n", this, bufu, count); 00358 return bufu; 00359 } 00360 00361 00362 size_t WvStream::continue_read(time_t wait_msec, void *buf, size_t count) 00363 { 00364 assert(uses_continue_select); 00365 00366 if (!count) 00367 return 0; 00368 00369 // FIXME: continue_select also uses the alarm, so this doesn't work. 00370 if (wait_msec >= 0) 00371 alarm(wait_msec); 00372 00373 queuemin(count); 00374 00375 int got = 0; 00376 00377 while (isok()) 00378 { 00379 if (continue_select(-1)) 00380 { 00381 if ((got = read(buf, count)) != 0) 00382 break; 00383 if (alarm_was_ticking) 00384 break; 00385 } 00386 } 00387 00388 if (wait_msec >= 0) 00389 alarm(-1); 00390 00391 queuemin(0); 00392 00393 return got; 00394 } 00395 00396 00397 size_t WvStream::write(const void *buf, size_t count) 00398 { 00399 if (!isok() || !buf || !count) return 0; 00400 00401 size_t wrote = 0; 00402 if (!outbuf_delayed_flush && !outbuf.used()) 00403 { 00404 wrote = uwrite(buf, count); 00405 count -= wrote; 00406 buf = (const unsigned char*)buf + wrote; 00407 } 00408 if (max_outbuf_size != 0) 00409 { 00410 size_t canbuffer = max_outbuf_size - outbuf.used(); 00411 if (count > canbuffer) 00412 count = canbuffer; // can't write the whole amount 00413 } 00414 if (count != 0) 00415 { 00416 outbuf.put(buf, count); 00417 wrote += count; 00418 } 00419 00420 if (should_flush()) 00421 { 00422 if (is_auto_flush) 00423 flush(0); 00424 else 00425 flush_outbuf(0); 00426 } 00427 00428 return wrote; 00429 } 00430 00431 00432 void WvStream::noread() 00433 { 00434 // FIXME: this really ought to be symmetrical with nowrite(), but instead 00435 // it's empty for some reason. 00436 } 00437 00438 00439 void WvStream::nowrite() 00440 { 00441 if (getwfd() < 0) 00442 return; 00443 00444 want_nowrite = true; 00445 } 00446 00447 00448 bool WvStream::isreadable() 00449 { 00450 return isok() && select(0, true, false, false); 00451 } 00452 00453 00454 bool WvStream::iswritable() 00455 { 00456 return isok() && select(0, false, true, false); 00457 } 00458 00459 00460 char *WvStream::getline(time_t wait_msec, char separator, int readahead) 00461 { 00462 struct timeval timeout_time; 00463 if (wait_msec > 0) 00464 timeout_time = msecadd(wvtime(), wait_msec); 00465 00466 // if we get here, we either want to wait a bit or there is data 00467 // available. 00468 while (isok()) 00469 { 00470 queuemin(0); 00471 00472 // if there is a newline already, return its string. 00473 size_t i = inbuf.strchr(separator); 00474 if (i > 0) 00475 { 00476 char *eol = (char *)inbuf.mutablepeek(i - 1, 1); 00477 assert(eol); 00478 *eol = 0; 00479 return (char *)inbuf.get(i); 00480 } 00481 else if (!isok()) // uh oh, stream is in trouble. 00482 { 00483 if (inbuf.used()) 00484 { 00485 // handle "EOF without newline" condition 00486 // FIXME: it's very silly that buffers can't return editable 00487 // char* arrays. 00488 inbuf.alloc(1)[0] = 0; // null-terminate it 00489 return const_cast<char*>( 00490 (const char*)inbuf.get(inbuf.used())); 00491 } 00492 else 00493 break; // nothing else to do! 00494 } 00495 00496 // make select not return true until more data is available 00497 size_t needed = inbuf.used() + 1; 00498 queuemin(needed); 00499 00500 // compute remaining timeout 00501 if (wait_msec > 0) 00502 { 00503 wait_msec = msecdiff(timeout_time, wvtime()); 00504 if (wait_msec < 0) 00505 wait_msec = 0; 00506 } 00507 00508 bool hasdata; 00509 if (uses_continue_select) 00510 hasdata = continue_select(wait_msec); 00511 else 00512 hasdata = select(wait_msec, true, false); 00513 if (!isok()) 00514 break; 00515 00516 if (hasdata) 00517 { 00518 // read a few bytes 00519 unsigned char *buf = inbuf.alloc(readahead); 00520 size_t len = uread(buf, readahead); 00521 inbuf.unalloc(readahead - len); 00522 hasdata = inbuf.used() >= needed; // enough? 00523 } 00524 00525 if (!hasdata && wait_msec == 0) 00526 break; // handle timeout 00527 } 00528 00529 // we timed out or had a socket error 00530 if (!isok() && inbuf.used()) 00531 { 00532 // if the stream has closed, dump the entire buffer as the last line 00533 inbuf.put("", 1); 00534 return (char *)inbuf.get(inbuf.used()); 00535 } 00536 else 00537 return NULL; 00538 } 00539 00540 00541 void WvStream::drain() 00542 { 00543 char buf[1024]; 00544 while (isreadable()) 00545 read(buf, sizeof(buf)); 00546 } 00547 00548 00549 bool WvStream::flush(time_t msec_timeout) 00550 { 00551 if (is_flushing) return false; 00552 00553 TRACE("%p flush starts\n", this); 00554 00555 is_flushing = true; 00556 want_to_flush = true; 00557 bool done = flush_internal(msec_timeout) // any other internal buffers 00558 && flush_outbuf(msec_timeout); // our own outbuf 00559 is_flushing = false; 00560 00561 TRACE("flush stops (%d)\n", done); 00562 return done; 00563 } 00564 00565 00566 bool WvStream::should_flush() 00567 { 00568 return want_to_flush; 00569 } 00570 00571 00572 bool WvStream::flush_outbuf(time_t msec_timeout) 00573 { 00574 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok()); 00575 00576 // flush outbuf 00577 while (isok() && outbuf.used()) 00578 { 00579 // fprintf(stderr, "%p: fd:%d/%d, used:%d\n", 00580 // this, getrfd(), getwfd(), outbuf.used()); 00581 00582 size_t attempt = outbuf.used(); 00583 size_t real = uwrite(outbuf.get(attempt), attempt); 00584 00585 // WARNING: uwrite() may have messed up our outbuf! 00586 // This probably only happens if uwrite() closed the stream because 00587 // of an error, so we'll check isok(). 00588 if (isok() && real < attempt) 00589 { 00590 TRACE("flush_outbuf: unget %d-%d\n", attempt, real); 00591 assert(outbuf.ungettable() >= attempt - real); 00592 outbuf.unget(attempt - real); 00593 } 00594 00595 // since post_select() can call us, and select() calls post_select(), 00596 // we need to be careful not to call select() if we don't need to! 00597 if (!msec_timeout || !select(msec_timeout, false, true)) 00598 { 00599 if (msec_timeout >= 0) 00600 break; 00601 } 00602 } 00603 00604 // handle autoclose 00605 if (isok() && autoclose_time) 00606 { 00607 time_t now = time(NULL); 00608 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n", 00609 this, now - autoclose_time, outbuf.used()); 00610 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time) 00611 { 00612 autoclose_time = 0; // avoid infinite recursion! 00613 close(); 00614 } 00615 } 00616 00617 TRACE("flush_outbuf: after autoclose chunk\n"); 00618 00619 if (!outbuf.used() && outbuf_delayed_flush) 00620 want_to_flush = false; 00621 00622 TRACE("flush_outbuf: now isok=%d\n", isok()); 00623 00624 // if we can't flush the outbuf, at least empty it! 00625 if (!isok()) 00626 outbuf.zap(); 00627 00628 TRACE("flush_outbuf stops\n"); 00629 00630 return !outbuf.used(); 00631 } 00632 00633 00634 bool WvStream::flush_internal(time_t msec_timeout) 00635 { 00636 // once outbuf emptied, that's it for most streams 00637 return true; 00638 } 00639 00640 00641 int WvStream::getrfd() const 00642 { 00643 return -1; 00644 } 00645 00646 00647 int WvStream::getwfd() const 00648 { 00649 return -1; 00650 } 00651 00652 00653 void WvStream::flush_then_close(int msec_timeout) 00654 { 00655 time_t now = time(NULL); 00656 autoclose_time = now + (msec_timeout + 999) / 1000; 00657 00658 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n", 00659 this, outbuf.used(), autoclose_time - now); 00660 00661 // as a fast track, we _could_ close here: but that's not a good idea, 00662 // since flush_then_close() deals with obscure situations, and we don't 00663 // want the caller to use it incorrectly. So we make things _always_ 00664 // break when the caller forgets to call select() later. 00665 00666 flush(0); 00667 } 00668 00669 00670 bool WvStream::pre_select(SelectInfo &si) 00671 { 00672 time_t alarmleft = alarm_remaining(); 00673 00674 if (alarmleft == 0) 00675 return true; // alarm has rung 00676 00677 if (!si.inherit_request) 00678 si.wants |= force; 00679 00680 // handle read-ahead buffering 00681 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min) 00682 return true; // already ready 00683 if (alarmleft >= 0 00684 && (alarmleft < si.msec_timeout || si.msec_timeout < 0)) 00685 si.msec_timeout = alarmleft; 00686 return false; 00687 } 00688 00689 00690 bool WvStream::post_select(SelectInfo &si) 00691 { 00692 // FIXME: need output buffer flush support for non FD-based streams 00693 // FIXME: need read_requires_writable and write_requires_readable 00694 // support for non FD-based streams 00695 return false; 00696 } 00697 00698 00699 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout, 00700 bool readable, bool writable, bool isexcept, bool forceable) 00701 { 00702 FD_ZERO(&si.read); 00703 FD_ZERO(&si.write); 00704 FD_ZERO(&si.except); 00705 00706 if (forceable) 00707 si.wants = force; 00708 else 00709 { 00710 si.wants.readable = readable; 00711 si.wants.writable = writable; 00712 si.wants.isexception = isexcept; 00713 } 00714 00715 si.max_fd = -1; 00716 si.msec_timeout = msec_timeout; 00717 si.inherit_request = ! forceable; 00718 si.global_sure = false; 00719 00720 if (!isok()) return false; 00721 00722 bool sure = pre_select(si); 00723 if (globalstream && forceable && (globalstream != this)) 00724 { 00725 WvStream *s = globalstream; 00726 globalstream = NULL; // prevent recursion 00727 si.global_sure = s->pre_select(si); 00728 globalstream = s; 00729 } 00730 if (sure || si.global_sure) 00731 si.msec_timeout = 0; 00732 return sure; 00733 } 00734 00735 00736 int WvStream::_do_select(SelectInfo &si) 00737 { 00738 // prepare timeout 00739 timeval tv; 00740 tv.tv_sec = si.msec_timeout / 1000; 00741 tv.tv_usec = (si.msec_timeout % 1000) * 1000; 00742 00743 // block 00744 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except, 00745 si.msec_timeout >= 0 ? &tv : (timeval*)NULL); 00746 00747 // handle errors. 00748 // EAGAIN and EINTR don't matter because they're totally normal. 00749 // ENOBUFS is hopefully transient. 00750 // EBADF is kind of gross and might imply that something is wrong, 00751 // but it happens sometimes... 00752 if (sel < 0 00753 && errno != EAGAIN && errno != EINTR 00754 && errno != EBADF 00755 && errno != ENOBUFS 00756 #ifdef _WIN32 00757 && errno != WSAEINVAL // the sets might be empty 00758 #endif 00759 ) 00760 { 00761 seterr(errno); 00762 } 00763 return sel; 00764 } 00765 00766 00767 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable) 00768 { 00769 if (!isok()) return false; 00770 00771 bool sure = post_select(si); 00772 if (globalstream && forceable && (globalstream != this)) 00773 { 00774 WvStream *s = globalstream; 00775 globalstream = NULL; // prevent recursion 00776 si.global_sure = s->post_select(si) || si.global_sure; 00777 globalstream = s; 00778 } 00779 return sure; 00780 } 00781 00782 00783 bool WvStream::_select(time_t msec_timeout, 00784 bool readable, bool writable, bool isexcept, bool forceable) 00785 { 00786 SelectInfo si; 00787 bool sure = _build_selectinfo(si, msec_timeout, 00788 readable, writable, isexcept, forceable); 00789 00790 if (!isok()) 00791 return false; 00792 00793 // the eternal question: if 'sure' is true already, do we need to do the 00794 // rest of this stuff? If we do, it might increase fairness a bit, but 00795 // it encourages select()ing when we know something fishy has happened - 00796 // when a stream is !isok() in a list, for example, pre_select() returns 00797 // true. If that's the case, our SelectInfo structure might not be 00798 // quite right (eg. it might be selecting on invalid fds). That doesn't 00799 // sound *too* bad, so let's go for the fairness. 00800 00801 int sel = _do_select(si); 00802 if (sel >= 0) 00803 sure = _process_selectinfo(si, forceable) || sure; // note the order 00804 if (si.global_sure && globalstream && forceable && (globalstream != this)) 00805 globalstream->callback(); 00806 return sure; 00807 } 00808 00809 00810 void WvStream::force_select(bool readable, bool writable, bool isexception) 00811 { 00812 force.readable |= readable; 00813 force.writable |= writable; 00814 force.isexception |= isexception; 00815 } 00816 00817 00818 void WvStream::undo_force_select(bool readable, bool writable, bool isexception) 00819 { 00820 force.readable &= !readable; 00821 force.writable &= !writable; 00822 force.isexception &= !isexception; 00823 } 00824 00825 00826 void WvStream::alarm(time_t msec_timeout) 00827 { 00828 if (msec_timeout >= 0) 00829 alarm_time = msecadd(wvtime(), msec_timeout); 00830 else 00831 alarm_time = wvtime_zero; 00832 } 00833 00834 00835 time_t WvStream::alarm_remaining() 00836 { 00837 if (alarm_time.tv_sec && !running_callback) 00838 { 00839 WvTime now = wvtime(); 00840 00841 /* Time is going backward! */ 00842 if (now < last_alarm_check) 00843 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now)); 00844 00845 last_alarm_check = now; 00846 00847 time_t remaining = msecdiff(alarm_time, now); 00848 if (remaining < 0) 00849 remaining = 0; 00850 return remaining; 00851 } 00852 return -1; 00853 } 00854 00855 00856 bool WvStream::continue_select(time_t msec_timeout) 00857 { 00858 assert(uses_continue_select); 00859 assert(task); 00860 assert(taskman); 00861 assert(taskman->whoami() == task); 00862 00863 if (msec_timeout >= 0) 00864 alarm(msec_timeout); 00865 00866 running_callback = false; 00867 taskman->yield(); 00868 running_callback = true; // and we're back! 00869 alarm(-1); 00870 00871 // when we get here, someone has jumped back into our task. 00872 // We have to select(0) here because it's possible that the alarm was 00873 // ticking _and_ data was available. This is aggravated especially if 00874 // msec_delay was zero. Note that running select() here isn't 00875 // inefficient, because if the alarm was expired then pre_select() 00876 // returned true anyway and short-circuited the previous select(). 00877 // 00878 // FIXME: we should probably be using select(t,r,w,x) here instead, but 00879 // I'm not sure. 00880 TRACE("hello-%p\n", this); 00881 return !alarm_was_ticking || select(0); 00882 } 00883 00884 00885 void WvStream::terminate_continue_select() 00886 { 00887 close(); 00888 if (task) 00889 { 00890 while (task->isrunning()) 00891 taskman->run(*task); 00892 task->recycle(); 00893 task = NULL; 00894 } 00895 } 00896 00897 00898 const WvAddr *WvStream::src() const 00899 { 00900 return NULL; 00901 } 00902 00903 00904 void WvStream::unread(WvBuf &unreadbuf, size_t count) 00905 { 00906 WvDynBuf tmp; 00907 tmp.merge(unreadbuf, count); 00908 tmp.merge(inbuf); 00909 inbuf.zap(); 00910 inbuf.merge(tmp); 00911 }

Generated on Tue Oct 5 01:09:21 2004 for WvStreams by doxygen 1.3.7