wvistreamlist.cc

00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * WvIStreamList holds a list of IWvStream objects -- and its select() and
00006  * callback() functions know how to handle multiple simultaneous streams.
00007  */
00008 #include "wvistreamlist.h"
00009 #include "wvstringlist.h"
00010 #include "wvstreamsdebugger.h"
00011 #include "wvstrutils.h"
00012 
00013 #include "wvassert.h"
00014 #include "wvstrutils.h"
00015 
00016 #ifndef _WIN32
00017 #include "wvfork.h"
00018 #endif
00019 
00020 #ifdef HAVE_VALGRIND_MEMCHECK_H
00021 #include <valgrind/memcheck.h>
00022 #else
00023 #define RUNNING_ON_VALGRIND false
00024 #endif
00025 
00026 // enable this to add some read/write trace messages (this can be VERY
00027 // verbose)
00028 #define STREAMTRACE 0
00029 #if STREAMTRACE
00030 # define TRACE(x, y...) fprintf(stderr, x, ## y)
00031 #else
00032 #ifndef _MSC_VER
00033 # define TRACE(x, y...)
00034 #else
00035 # define TRACE
00036 #endif
00037 #endif
00038 
00039 WvIStreamList WvIStreamList::globallist;
00040 
00041 
00042 WvIStreamList::WvIStreamList():
00043     in_select(false), dead_stream(false)
00044 {
00045     readcb = writecb = exceptcb = 0;
00046     auto_prune = true;
00047     if (this == &globallist)
00048     {
00049         globalstream = this;
00050 #ifndef _WIN32
00051         add_wvfork_callback(WvIStreamList::onfork);
00052 #endif
00053         set_wsname("globallist");
00054         add_debugger_commands();
00055     }
00056 }
00057 
00058 
00059 WvIStreamList::~WvIStreamList()
00060 {
00061     // nothing to do
00062 }
00063 
00064 
00065 bool WvIStreamList::isok() const
00066 {
00067     return true;  // "error" condition on a list is undefined
00068 }
00069 
00070 
00071 class BoolGuard
00072 {
00073 public:
00074     BoolGuard(bool &_guard_bool):
00075         guard_bool(_guard_bool)
00076     {
00077         assert(!guard_bool);
00078         guard_bool = true;
00079     }
00080     ~BoolGuard()
00081     {
00082         guard_bool = false;
00083     }
00084 private:
00085     bool &guard_bool;
00086 };
00087 
00088 
00089 void WvIStreamList::pre_select(SelectInfo &si)
00090 {
00091     //BoolGuard guard(in_select);
00092     bool already_sure = false;
00093     SelectRequest oldwant = si.wants;
00094     
00095     dead_stream = false;
00096     sure_thing.zap();
00097     
00098     time_t alarmleft = alarm_remaining();
00099     if (alarmleft == 0)
00100         already_sure = true;
00101 
00102     IWvStream *old_in_stream = WvCrashInfo::in_stream;
00103     const char *old_in_stream_id = WvCrashInfo::in_stream_id;
00104     WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
00105     WvCrashInfo::in_stream_state = WvCrashInfo::PRE_SELECT;
00106 
00107     Iter i(*this);
00108     for (i.rewind(); i.next(); )
00109     {
00110         IWvStream &s(*i);
00111 #if I_ENJOY_FORMATTING_STRINGS
00112         WvCrashWill will("doing pre_select for \"%s\" (%s)\n%s",
00113                          i.link->id, ptr2str(&s), wvcrash_read_will());
00114 #else
00115         WvCrashInfo::in_stream = &s;
00116         WvCrashInfo::in_stream_id = i.link->id;
00117 #endif
00118         si.wants = oldwant;
00119 
00120         if (!s.isok())
00121         {
00122             dead_stream = true;
00123             already_sure = true;
00124             if (auto_prune)
00125                 i.xunlink();
00126             continue;
00127         }
00128         else 
00129             s.pre_select(si);
00130         
00131         TRACE("after pre_select(%s): msec_timeout is %ld\n",
00132               i.link->id, (long)si.msec_timeout);
00133     }
00134 
00135     WvCrashInfo::in_stream = old_in_stream;
00136     WvCrashInfo::in_stream_id = old_in_stream_id;
00137     WvCrashInfo::in_stream_state = old_in_stream_state;
00138 
00139     if (alarmleft >= 0 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00140         si.msec_timeout = alarmleft;
00141     
00142     si.wants = oldwant;
00143 
00144     if (already_sure)
00145         si.msec_timeout = 0;
00146 }
00147 
00148 
00149 bool WvIStreamList::post_select(SelectInfo &si)
00150 {
00151     //BoolGuard guard(in_select);
00152     bool already_sure = dead_stream;
00153     SelectRequest oldwant = si.wants;
00154     
00155     dead_stream = false;
00156 
00157     time_t alarmleft = alarm_remaining();
00158     if (alarmleft == 0)
00159         already_sure = true;
00160 
00161     IWvStream *old_in_stream = WvCrashInfo::in_stream;
00162     const char *old_in_stream_id = WvCrashInfo::in_stream_id;
00163     WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
00164     WvCrashInfo::in_stream_state = WvCrashInfo::POST_SELECT;
00165 
00166     Iter i(*this);
00167     for (i.rewind(); i.cur() && i.next(); )
00168     {
00169         IWvStream &s(*i);
00170 #if I_ENJOY_FORMATTING_STRINGS
00171         WvCrashWill will("doing post_select for \"%s\" (%s)\n%s",
00172                          i.link->id, ptr2str(&s), wvcrash_read_will());
00173 #else
00174         WvCrashInfo::in_stream = &s;
00175         WvCrashInfo::in_stream_id = i.link->id;
00176 #endif
00177 
00178         if (s.isok())
00179         {
00180             si.wants = oldwant;
00181             if (s.post_select(si))
00182             {
00183                 TRACE("post_select(%s) was true\n", i.link->id);
00184                 sure_thing.unlink(&s); // don't add it twice!
00185                 sure_thing.append(&s, false, i.link->id);
00186             }
00187             else
00188             {
00189                 TRACE("post_select(%s) was false\n", i.link->id);
00190                 WvIStreamListBase::Iter j(sure_thing);
00191                 WvLink* link = j.find(&s);
00192 
00193                 wvassert(!link, "stream \"%s\" (%s) was ready in "
00194                          "pre_select, but not in post_select",
00195                          link->id, ptr2str(link->data));
00196             }
00197         }
00198         else
00199             already_sure = true;
00200     }
00201     
00202     WvCrashInfo::in_stream = old_in_stream;
00203     WvCrashInfo::in_stream_id = old_in_stream_id;
00204     WvCrashInfo::in_stream_state = old_in_stream_state;
00205 
00206     si.wants = oldwant;
00207     return already_sure || !sure_thing.isempty();
00208 }
00209 
00210 
00211 // distribute the callback() request to all children that select 'true'
00212 void WvIStreamList::execute()
00213 {
00214     static int level = 0;
00215     const char *id;
00216     level++;
00217     
00218     WvStream::execute();
00219     
00220     TRACE("\n%*sList@%p: (%d sure) ", level, "", this, sure_thing.count());
00221     
00222     IWvStream *old_in_stream = WvCrashInfo::in_stream;
00223     const char *old_in_stream_id = WvCrashInfo::in_stream_id;
00224     WvCrashInfo::InStreamState old_in_stream_state = WvCrashInfo::in_stream_state;
00225     WvCrashInfo::in_stream_state = WvCrashInfo::EXECUTE;
00226 
00227     WvIStreamListBase::Iter i(sure_thing);
00228     for (i.rewind(); i.next(); )
00229     {
00230 #if STREAMTRACE
00231         WvIStreamListBase::Iter x(*this);
00232         if (!x.find(&i()))
00233             TRACE("Yikes! %p in sure_thing, but not in main list!\n",
00234                   i.cur());
00235 #endif
00236         IWvStream &s(*i);
00237         
00238         id = i.link->id;
00239 
00240         TRACE("[%p:%s]", &s, id);
00241         
00242         i.xunlink();
00243         
00244         if (s.isok())
00245         {
00246 #if DEBUG
00247             if (!RUNNING_ON_VALGRIND)
00248             {
00249                 WvString strace_node("%s: %s", s.wstype(), s.wsname());
00250                 ::write(-1, strace_node, strace_node.len()); 
00251             }
00252 #endif
00253 #if I_ENJOY_FORMATTING_STRINGS
00254             WvCrashWill my_will("executing stream: %s\n%s",
00255                                 id ? id : "unknown stream",
00256                                 wvcrash_read_will());
00257 #else
00258             WvCrashInfo::in_stream = &s;
00259             WvCrashInfo::in_stream_id = id;
00260 #endif
00261 
00262             s.callback();
00263         }
00264         
00265         // list might have changed!
00266         i.rewind();
00267     }
00268     
00269     WvCrashInfo::in_stream = old_in_stream;
00270     WvCrashInfo::in_stream_id = old_in_stream_id;
00271     WvCrashInfo::in_stream_state = old_in_stream_state;
00272 
00273     sure_thing.zap();
00274 
00275     level--;
00276     TRACE("[DONE %p]\n", this);
00277 }
00278 
00279 #ifndef _WIN32
00280 void WvIStreamList::onfork(pid_t p)
00281 {
00282     if (p == 0)
00283     {
00284         // this is a child process: don't inherit the global streamlist
00285         globallist.zap(false);
00286     }
00287 }
00288 #endif
00289 
00290 
00291 void WvIStreamList::add_debugger_commands()
00292 {
00293     WvStreamsDebugger::add_command("globallist", 0, debugger_globallist_run_cb, 0);
00294 }
00295 
00296 
00297 WvString WvIStreamList::debugger_globallist_run_cb(WvStringParm cmd,
00298     WvStringList &args,
00299     WvStreamsDebugger::ResultCallback result_cb, void *)
00300 {
00301     debugger_streams_display_header(cmd, result_cb);
00302     WvIStreamList::Iter i(globallist);
00303     for (i.rewind(); i.next(); )
00304         debugger_streams_maybe_display_one_stream(static_cast<WvStream *>(i.ptr()),
00305                 cmd, args, result_cb);
00306     
00307     return WvString::null;
00308 }
00309 

Generated on Thu Jan 24 16:50:56 2008 for WvStreams by  doxygen 1.5.4