00001
00002
00003
00004
00005
00006
00007 #ifndef __WVSTREAM_H
00008 #define __WVSTREAM_H
00009
00010 #include "iwvstream.h"
00011 #include "wvtimeutils.h"
00012 #include "wvstreamsdebugger.h"
00013 #include <errno.h>
00014 #include <limits.h>
00015 #include "wvattrs.h"
00016
00024 class WvStream: public IWvStream
00025 {
00026 IMPLEMENT_IOBJECT(WvStream);
00027
00028 WvString my_wsname;
00029 WSID my_wsid;
00030 WvAttrs attrs;
00031 public:
00036 WvStream *read_requires_writable;
00037
00042 WvStream *write_requires_readable;
00043
00045 bool uses_continue_select;
00046
00048 size_t personal_stack_size;
00049
00054 bool alarm_was_ticking;
00055
00057 bool stop_read, stop_write, closed;
00058
00060 WvStream();
00061 virtual ~WvStream();
00062
00070 virtual void close();
00071
00073 virtual void seterr(int _errnum);
00074 void seterr(WvStringParm specialerr)
00075 { WvErrorBase::seterr(specialerr); }
00076 void seterr(WVSTRING_FORMAT_DECL)
00077 { seterr(WvString(WVSTRING_FORMAT_CALL)); }
00078
00080 virtual bool isok() const;
00081
00083 virtual size_t read(void *buf, size_t count);
00084
00094 virtual size_t read(WvBuf &outbuf, size_t count);
00095
00101 virtual void unread(WvBuf &outbuf, size_t count);
00102
00109 virtual size_t write(const void *buf, size_t count);
00110
00118 virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
00119
00129 void outbuf_limit(size_t size)
00130 { max_outbuf_size = size; }
00131
00132 virtual void noread();
00133 virtual void nowrite();
00134 virtual void maybe_autoclose();
00135
00136 virtual bool isreadable();
00137 virtual bool iswritable();
00138
00146 virtual size_t uread(void *buf, size_t count)
00147 { return 0; }
00148
00156 virtual size_t uwrite(const void *buf, size_t count)
00157 { return count; }
00158
00175 char *getline(time_t wait_msec = 0,
00176 char separator = '\n', int readahead = 1024)
00177 {
00178 return blocking_getline(wait_msec, separator, readahead);
00179 }
00180
00182 char *getline(int wait_msec,
00183 char separator = '\n', int readahead = 1024)
00184 {
00185 return getline(time_t(wait_msec), separator, readahead);
00186 }
00187
00189 char *getline(double wait_msec,
00190 char separator = '\n', int readahead = 1024)
00191 {
00192 return getline(time_t(wait_msec), separator, readahead);
00193 }
00194
00195 private:
00200 char *getline(char, int i = 0);
00201 char *getline(bool, int i = 0);
00202 public:
00203
00215 char *blocking_getline(time_t wait_msec, int separator = '\n',
00216 int readahead = 1024);
00217
00222 char *continue_getline(time_t wait_msec, int separator = '\n',
00223 int readahead = 1024);
00224
00232 void queuemin(size_t count)
00233 { queue_min = count; }
00234
00239 void drain();
00240
00246 void delay_output(bool is_delayed)
00247 {
00248 outbuf_delayed_flush = is_delayed;
00249 want_to_flush = !is_delayed;
00250 }
00251
00258 void auto_flush(bool is_automatic)
00259 { is_auto_flush = is_automatic; }
00260
00267 virtual bool flush(time_t msec_timeout);
00268
00269 virtual bool should_flush();
00270
00277 void flush_then_close(int msec_timeout);
00278
00300 virtual void pre_select(SelectInfo &si);
00301
00306 void pre_select(SelectInfo &si, const SelectRequest &r)
00307 {
00308 SelectRequest oldwant = si.wants;
00309 si.wants = r;
00310 pre_select(si);
00311 si.wants = oldwant;
00312 }
00313
00318 void xpre_select(SelectInfo &si, const SelectRequest &r)
00319 { pre_select(si, r); }
00320
00333 virtual bool post_select(SelectInfo &si);
00334
00339 bool xpost_select(SelectInfo &si, const SelectRequest &r)
00340 { return post_select(si, r); }
00341
00346 bool post_select(SelectInfo &si, const SelectRequest &r)
00347 {
00348 SelectRequest oldwant = si.wants;
00349 si.wants = r;
00350 bool val = post_select(si);
00351 si.wants = oldwant;
00352 return val;
00353 }
00354
00376 bool select(time_t msec_timeout)
00377 { return _select(msec_timeout, false, false, false, true); }
00378
00391 void runonce(time_t msec_timeout = -1)
00392 { if (select(msec_timeout)) callback(); }
00393
00415 bool select(time_t msec_timeout,
00416 bool readable, bool writable, bool isex = false)
00417 { return _select(msec_timeout, readable, writable, isex, false); }
00418
00424 IWvStream::SelectRequest get_select_request();
00425
00434 void force_select(bool readable, bool writable, bool isexception = false);
00435
00440 void undo_force_select(bool readable, bool writable,
00441 bool isexception = false);
00442
00460 bool continue_select(time_t msec_timeout);
00461
00467 void terminate_continue_select();
00468
00473 virtual const WvAddr *src() const;
00474
00479 void setcallback(IWvStreamCallback _callfunc);
00480
00482 IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
00483
00485 IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
00486
00489 IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
00490
00492 IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
00493
00499 void autoforward(WvStream &s);
00500
00502 void noautoforward();
00503 static void autoforward_callback(WvStream &input, WvStream &output);
00504
00508 void *_callwrap(void *);
00509
00513 void _callback();
00514
00519 virtual void callback();
00520
00525 void alarm(time_t msec_timeout);
00526
00532 time_t alarm_remaining();
00533
00538 size_t write(WvStringParm s)
00539 { return write(s.cstr(), s.len()); }
00540 size_t print(WvStringParm s)
00541 { return write(s); }
00542 size_t operator() (WvStringParm s)
00543 { return write(s); }
00544
00546 size_t print(WVSTRING_FORMAT_DECL)
00547 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00548 size_t operator() (WVSTRING_FORMAT_DECL)
00549 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00550
00551 const char *wsname() const
00552 { return my_wsname; }
00553 void set_wsname(WvStringParm wsname)
00554 { my_wsname = wsname; }
00555 void set_wsname(WVSTRING_FORMAT_DECL)
00556 { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
00557
00558 const char *wstype() const { return "WvStream"; }
00559
00560 WSID wsid() const { return my_wsid; }
00561 static IWvStream *find_by_wsid(WSID wsid);
00562
00563 virtual WvString getattr(WvStringParm name) const
00564 { return attrs.get(name); }
00565
00566
00567
00568
00569 #ifdef __WVSTREAM_UNIT_TEST
00570 public:
00571 size_t outbuf_used()
00572 { return outbuf.used(); }
00573 size_t inbuf_used()
00574 { return inbuf.used(); }
00575 void inbuf_putstr(WvStringParm t)
00576 { inbuf.putstr(t); }
00577 #endif
00578
00579 protected:
00580 void setattr(WvStringParm name, WvStringParm value)
00581 { attrs.set(name, value); }
00582
00583
00584
00585
00586
00587 void _build_selectinfo(SelectInfo &si, time_t msec_timeout,
00588 bool readable, bool writable, bool isexcept,
00589 bool forceable);
00590
00591
00592
00593
00594 int _do_select(SelectInfo &si);
00595
00596
00597
00598 bool _process_selectinfo(SelectInfo &si, bool forceable);
00599
00600
00601
00602
00603
00604 bool flush_outbuf(time_t msec_timeout);
00605
00606
00607
00608 virtual bool flush_internal(time_t msec_timeout);
00609
00610
00611
00612
00613 virtual int getrfd() const;
00614 virtual int getwfd() const;
00615
00616
00617
00618 friend class WvHTTPClientProxyStream;
00619
00620 WvDynBuf inbuf, outbuf;
00621
00622 IWvStreamCallback callfunc;
00623 wv::function<void*(void*)> call_ctx;
00624
00625 IWvStreamCallback readcb, writecb, exceptcb, closecb;
00626
00627 size_t max_outbuf_size;
00628 bool outbuf_delayed_flush;
00629 bool is_auto_flush;
00630
00631
00632 bool want_to_flush;
00633
00634
00635 bool is_flushing;
00636
00637 size_t queue_min;
00638 time_t autoclose_time;
00639 WvTime alarm_time;
00640 WvTime last_alarm_check;
00641
00652 virtual void execute()
00653 { }
00654
00655
00656 static WvStream *globalstream;
00657
00658 static void debugger_streams_display_header(WvStringParm cmd,
00659 WvStreamsDebugger::ResultCallback result_cb);
00660 static void debugger_streams_display_one_stream(WvStream *s,
00661 WvStringParm cmd,
00662 WvStreamsDebugger::ResultCallback result_cb);
00663 static void debugger_streams_maybe_display_one_stream(WvStream *s,
00664 WvStringParm cmd,
00665 const WvStringList &args,
00666 WvStreamsDebugger::ResultCallback result_cb);
00667
00668 private:
00670 bool _select(time_t msec_timeout,
00671 bool readable, bool writable, bool isexcept,
00672 bool forceable);
00673
00674 void legacy_callback();
00675
00677 WvStream(const WvStream &s);
00678 WvStream& operator= (const WvStream &s);
00679 static void add_debugger_commands();
00680
00681 static WvString debugger_streams_run_cb(WvStringParm cmd,
00682 WvStringList &args,
00683 WvStreamsDebugger::ResultCallback result_cb, void *);
00684 static WvString debugger_close_run_cb(WvStringParm cmd,
00685 WvStringList &args,
00686 WvStreamsDebugger::ResultCallback result_cb, void *);
00687 };
00688
00695 extern WvStream *wvcon;
00696 extern WvStream *wvin;
00697 extern WvStream *wvout;
00698 extern WvStream *wverr;
00699
00700 #endif // __WVSTREAM_H