00001
00002
00003
00004
00005
00006
00007 #ifndef __WVSTREAM_H
00008 #define __WVSTREAM_H
00009
00010 #include "iwvstream.h"
00011 #include "wvtimeutils.h"
00012 #include <errno.h>
00013 #include <limits.h>
00014
00015 #ifdef _WIN32
00016 #include <time.h>
00017 #include <winsock2.h>
00018 #include <ws2tcpip.h>
00019 #include "wvwin32-sanitize.h"
00020 #else
00021 #include <unistd.h>
00022 #include <sys/time.h>
00023 #endif
00024
00025
00026
00027 typedef WvCallback<void, WvStream&, void*> WvStreamCallback;
00028
00036 class WvStream: public IWvStream
00037 {
00038 IMPLEMENT_IOBJECT(WvStream);
00039 public:
00044 WvStream *read_requires_writable;
00045
00050 WvStream *write_requires_readable;
00051
00053 bool uses_continue_select;
00054
00056 size_t personal_stack_size;
00057
00062 bool alarm_was_ticking;
00063
00065 bool stop_read, stop_write, closed;
00066
00068 WvStream();
00069 virtual ~WvStream();
00070
00078 virtual void close();
00079
00081 virtual void seterr(int _errnum);
00082 void seterr(WvStringParm specialerr)
00083 { WvErrorBase::seterr(specialerr); }
00084 void seterr(WVSTRING_FORMAT_DECL)
00085 { seterr(WvString(WVSTRING_FORMAT_CALL)); }
00086
00088 virtual bool isok() const;
00089
00091 virtual size_t read(void *buf, size_t count);
00092
00102 virtual size_t read(WvBuf &outbuf, size_t count);
00103
00109 virtual void unread(WvBuf &outbuf, size_t count);
00110
00117 virtual size_t write(const void *buf, size_t count);
00118
00126 virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
00127
00137 void outbuf_limit(size_t size)
00138 { max_outbuf_size = size; }
00139
00140 virtual void noread();
00141 virtual void nowrite();
00142 virtual void maybe_autoclose();
00143
00144 virtual bool isreadable();
00145 virtual bool iswritable();
00146
00154 virtual size_t uread(void *buf, size_t count)
00155 { return 0; }
00156
00164 virtual size_t uwrite(const void *buf, size_t count)
00165 { return count; }
00166
00183 char *getline(time_t wait_msec = 0,
00184 char separator = '\n', int readahead = 1024)
00185 {
00186 return blocking_getline(wait_msec, separator, readahead);
00187 }
00188
00190 char *getline(int wait_msec,
00191 char separator = '\n', int readahead = 1024)
00192 {
00193 return getline(time_t(wait_msec), separator, readahead);
00194 }
00195
00197 char *getline(double wait_msec,
00198 char separator = '\n', int readahead = 1024)
00199 {
00200 return getline(time_t(wait_msec), separator, readahead);
00201 }
00202
00203 private:
00208 char *getline(char, int i = 0);
00209 char *getline(bool, int i = 0);
00210 public:
00211
00223 char *blocking_getline(time_t wait_msec, int separator = '\n',
00224 int readahead = 1024);
00225
00230 char *continue_getline(time_t wait_msec, int separator = '\n',
00231 int readahead = 1024);
00232
00240 void queuemin(size_t count)
00241 { queue_min = count; }
00242
00247 void drain();
00248
00254 void delay_output(bool is_delayed)
00255 {
00256 outbuf_delayed_flush = is_delayed;
00257 want_to_flush = !is_delayed;
00258 }
00259
00266 void auto_flush(bool is_automatic)
00267 { is_auto_flush = is_automatic; }
00268
00275 virtual bool flush(time_t msec_timeout);
00276
00277 virtual bool should_flush();
00278
00285 void flush_then_close(int msec_timeout);
00286
00308 virtual bool pre_select(SelectInfo &si);
00309
00314 bool pre_select(SelectInfo &si, const SelectRequest &r)
00315 {
00316 SelectRequest oldwant = si.wants;
00317 si.wants = r;
00318 bool val = pre_select(si);
00319 si.wants = oldwant;
00320 return val;
00321 }
00322
00327 bool xpre_select(SelectInfo &si, const SelectRequest &r)
00328 { return pre_select(si, r); }
00329
00342 virtual bool post_select(SelectInfo &si);
00343
00348 bool xpost_select(SelectInfo &si, const SelectRequest &r)
00349 { return post_select(si, r); }
00350
00355 bool post_select(SelectInfo &si, const SelectRequest &r)
00356 {
00357 SelectRequest oldwant = si.wants;
00358 si.wants = r;
00359 bool val = post_select(si);
00360 si.wants = oldwant;
00361 return val;
00362 }
00363
00385 bool select(time_t msec_timeout)
00386 { return _select(msec_timeout, false, false, false, true); }
00387
00400 void runonce(time_t msec_timeout = -1)
00401 { if (select(msec_timeout)) callback(); }
00402
00424 bool select(time_t msec_timeout,
00425 bool readable, bool writable, bool isex = false)
00426 { return _select(msec_timeout, readable, writable, isex, false); }
00427
00433 IWvStream::SelectRequest get_select_request();
00434
00443 void force_select(bool readable, bool writable, bool isexception = false);
00444
00449 void undo_force_select(bool readable, bool writable,
00450 bool isexception = false);
00451
00469 bool continue_select(time_t msec_timeout);
00470
00476 void terminate_continue_select();
00477
00482 virtual const WvAddr *src() const;
00483
00488 void setcallback(WvStreamCallback _callfunc, void *_userdata);
00489
00491 IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
00492
00494 IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
00495
00498 IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
00499
00501 IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
00502
00508 void autoforward(WvStream &s);
00509
00511 void noautoforward();
00512 static void autoforward_callback(WvStream &s, void *userdata);
00513
00517 void *_callwrap(void *);
00518
00522 void _callback();
00523
00528 virtual void callback();
00529
00534 void alarm(time_t msec_timeout);
00535
00541 time_t alarm_remaining();
00542
00547 size_t write(WvStringParm s)
00548 { return write(s.cstr(), s.len()); }
00549 size_t print(WvStringParm s)
00550 { return write(s); }
00551 size_t operator() (WvStringParm s)
00552 { return write(s); }
00553
00555 size_t print(WVSTRING_FORMAT_DECL)
00556 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00557 size_t operator() (WVSTRING_FORMAT_DECL)
00558 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00559
00560 protected:
00561
00562
00563
00564
00565
00566 bool _build_selectinfo(SelectInfo &si, time_t msec_timeout,
00567 bool readable, bool writable, bool isexcept,
00568 bool forceable);
00569
00570
00571
00572
00573 int _do_select(SelectInfo &si);
00574
00575
00576
00577 bool _process_selectinfo(SelectInfo &si, bool forceable);
00578
00579
00580
00581
00582
00583 bool flush_outbuf(time_t msec_timeout);
00584
00585
00586
00587 virtual bool flush_internal(time_t msec_timeout);
00588
00589
00590
00591
00592 virtual int getrfd() const;
00593 virtual int getwfd() const;
00594
00595 private:
00597 bool _select(time_t msec_timeout,
00598 bool readable, bool writable, bool isexcept,
00599 bool forceable);
00600
00601 void legacy_callback(IWvStream& s);
00602
00603 protected:
00604
00605
00606 friend class WvHTTPClientProxyStream;
00607
00608 WvDynBuf inbuf, outbuf;
00609
00610 WvStreamCallback callfunc;
00611 void *userdata;
00612 WvCallback<void*,void*> call_ctx;
00613
00614 IWvStreamCallback readcb, writecb, exceptcb, closecb;
00615
00616 size_t max_outbuf_size;
00617 bool outbuf_delayed_flush;
00618 bool is_auto_flush;
00619
00620
00621 bool want_to_flush;
00622
00623
00624 bool is_flushing;
00625
00626 size_t queue_min;
00627 time_t autoclose_time;
00628 WvTime alarm_time;
00629 WvTime last_alarm_check;
00630
00641 virtual void execute()
00642 { }
00643
00644
00645 static WvStream *globalstream;
00646
00647
00648
00649
00650 #ifdef __WVSTREAM_UNIT_TEST
00651 public:
00652 size_t outbuf_used()
00653 { return outbuf.used(); }
00654 size_t inbuf_used()
00655 { return inbuf.used(); }
00656 void inbuf_putstr(WvStringParm t)
00657 { inbuf.putstr(t); }
00658 #endif
00659
00660 private:
00662 WvStream(const WvStream &s);
00663 WvStream& operator= (const WvStream &s);
00664 };
00665
00672 extern WvStream *wvcon;
00673 extern WvStream *wvin;
00674 extern WvStream *wvout;
00675 extern WvStream *wverr;
00676
00677 #endif // __WVSTREAM_H