00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
#include "wvstream.h"
00012
#include "wvtask.h"
00013
#include "wvtimeutils.h"
00014
#include <time.h>
00015
#include <sys/types.h>
00016
#include <assert.h>
00017
00018
#ifdef _WIN32
00019
#define ENOBUFS WSAENOBUFS
00020
#undef errno
00021
#define errno GetLastError()
00022
#else
00023
#include <errno.h>
00024
#endif
00025
00026
00027
00028
#if 0
00029
# define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00030
#else
00031
#ifndef _MSC_VER
00032 # define TRACE(x, y...)
00033
#else
00034
# define TRACE
00035
#endif
00036
#endif
00037
00038 WvStream *
WvStream::globalstream = NULL;
00039
00040
XUUID_MAP_BEGIN(
IWvStream)
00041 XUUID_MAP_ENTRY(
IObject)
00042 XUUID_MAP_ENTRY(
IWvStream)
00043 XUUID_MAP_END
00044
00045 WvStream::
WvStream()
00046 {
00047
TRACE(
"Creating wvstream %p\n",
this);
00048
00049
#ifdef _WIN32
00050
WSAData wsaData;
00051
int result = WSAStartup(MAKEWORD(2,0), &wsaData);
00052 assert(result == 0);
00053
#endif
00054
wvstream_execute_called =
false;
00055 userdata = closecb_data = NULL;
00056 errnum = 0;
00057 max_outbuf_size = 0;
00058 outbuf_delayed_flush =
false;
00059 want_to_flush =
true;
00060 is_flushing =
false;
00061 is_auto_flush =
true;
00062 alarm_was_ticking =
false;
00063 force.readable =
true;
00064 force.writable = force.isexception =
false;
00065 read_requires_writable = write_requires_readable = NULL;
00066 running_callback =
false;
00067 want_nowrite =
false;
00068 queue_min = 0;
00069 autoclose_time = 0;
00070 alarm_time = wvtime_zero;
00071 last_alarm_check = wvtime_zero;
00072 taskman = 0;
00073
00074
00075 uses_continue_select =
false;
00076 personal_stack_size = 65536;
00077 task = NULL;
00078 }
00079
00080
00081
00082 IWvStream::IWvStream()
00083 {
00084 }
00085
00086
00087 IWvStream::~IWvStream()
00088 {
00089 }
00090
00091
00092 WvStream::~WvStream()
00093 {
00094
TRACE(
"destroying %p\n",
this);
00095
if (
running_callback)
00096 {
00097
00098
TRACE(
"eek! destroying while running_callback!\n");
00099 assert(!
running_callback);
00100 }
00101
close();
00102
00103
if (
task)
00104 {
00105
while (
task->
isrunning())
00106
taskman->
run(*
task);
00107
task->
recycle();
00108
task = NULL;
00109 }
00110
TRACE(
"done destroying %p\n",
this);
00111
if (
taskman)
00112
taskman->
unlink();
00113 }
00114
00115
00116 void WvStream::close()
00117 {
00118
flush(2000);
00119
if (!!
closecb_func)
00120 {
00121
WvStreamCallback cb =
closecb_func;
00122 closecb_func = 0;
00123 cb(*
this,
closecb_data);
00124 }
00125 }
00126
00127
00128 void WvStream::autoforward(
WvStream &s)
00129 {
00130
setcallback(
autoforward_callback, &s);
00131
read_requires_writable = &s;
00132 }
00133
00134
00135 void WvStream::noautoforward()
00136 {
00137
setcallback(0, NULL);
00138
read_requires_writable = NULL;
00139 }
00140
00141
00142 void WvStream::autoforward_callback(
WvStream &s,
void *userdata)
00143 {
00144
WvStream &s2 = *(
WvStream *)userdata;
00145
char buf[1024];
00146 size_t len;
00147
00148 len = s.
read(buf,
sizeof(buf));
00149 s2.
write(buf, len);
00150 }
00151
00152
00153
00154
00155
00156 void WvStream::_callback(
void *stream)
00157 {
00158
WvStream *s = (
WvStream *)stream;
00159
00160 s->
running_callback =
true;
00161
00162 s->
wvstream_execute_called =
false;
00163 s->
execute();
00164
if (!! s->
callfunc)
00165 s->
callfunc(*s, s->
userdata);
00166
00167
00168
00169
00170
00171
00172 assert(s->
wvstream_execute_called);
00173
00174 s->
running_callback =
false;
00175 }
00176
00177
00178 void WvStream::callback()
00179 {
00180
TRACE(
"(?)");
00181
00182
00183
00184
if (
running_callback)
00185
return;
00186
00187
00188
if (
alarm_remaining() == 0)
00189 {
00190
alarm_time = wvtime_zero;
00191
alarm_was_ticking =
true;
00192 }
00193
else
00194
alarm_was_ticking =
false;
00195
00196 assert(!
uses_continue_select ||
personal_stack_size >= 1024);
00197
00198
00199
if (
uses_continue_select && personal_stack_size >= 1024)
00200 {
00201
if (!
taskman)
00202
taskman =
WvTaskMan::get();
00203
00204
if (!
task)
00205 {
00206
TRACE(
"(!)");
00207
task =
taskman->
start(
"streamexec",
_callback,
this,
00208 personal_stack_size);
00209 }
00210
else if (!
task->
isrunning())
00211 {
00212
TRACE(
"(.)");
00213
task->
start(
"streamexec2",
_callback,
this);
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
do
00235 {
00236
taskman->
run(*
task);
00237 }
while (
task &&
task->
isrunning() &&
running_callback);
00238 }
00239
else
00240
_callback(
this);
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256 }
00257
00258
00259 void WvStream::execute()
00260 {
00261
00262
wvstream_execute_called =
true;
00263 }
00264
00265
00266 bool WvStream::isok()
const
00267
{
00268
return WvError::isok();
00269 }
00270
00271
00272 void WvStream::seterr(
int _errnum)
00273 {
00274
if (!errnum)
00275 {
00276 WvError::seterr(_errnum);
00277
close();
00278 }
00279 }
00280
00281
00282 size_t
WvStream::read(
WvBuf &outbuf, size_t count)
00283 {
00284
00285 size_t free = outbuf.
free();
00286
if (count > free)
00287 count = free;
00288
00289
WvDynBuf tmp;
00290
unsigned char *buf = tmp.
alloc(count);
00291 size_t len =
read(buf, count);
00292 tmp.
unalloc(count - len);
00293 outbuf.
merge(tmp);
00294
return len;
00295 }
00296
00297
00298 size_t
WvStream::continue_read(time_t wait_msec,
WvBuf &outbuf, size_t count)
00299 {
00300
00301 size_t free = outbuf.
free();
00302
if (count > free)
00303 count = free;
00304
unsigned char *buf = outbuf.
alloc(count);
00305
00306
00307 size_t len =
continue_read(wait_msec, buf, count);
00308
00309 outbuf.
unalloc(count - len);
00310
return len;
00311 }
00312
00313
00314 size_t
WvStream::write(
WvBuf &inbuf, size_t count)
00315 {
00316
00317 size_t avail = inbuf.
used();
00318
if (count > avail)
00319 count = avail;
00320
const unsigned char *buf = inbuf.
get(count);
00321 size_t len =
write(buf, count);
00322 inbuf.
unget(count - len);
00323
return len;
00324 }
00325
00326
00327 size_t
WvStream::read(
void *buf, size_t count)
00328 {
00329 size_t bufu =
inbuf.
used(), i;
00330
unsigned char *newbuf;
00331
00332 bufu =
inbuf.
used();
00333
if (bufu <
queue_min)
00334 {
00335 newbuf =
inbuf.
alloc(
queue_min - bufu);
00336 i =
uread(newbuf,
queue_min - bufu);
00337
inbuf.
unalloc(
queue_min - bufu - i);
00338
00339 bufu =
inbuf.
used();
00340 }
00341
00342
if (bufu <
queue_min)
00343
return 0;
00344
00345
00346
if (!bufu)
00347 bufu =
uread(buf, count);
00348
else
00349 {
00350
00351
if (bufu > count)
00352 bufu = count;
00353
00354 memcpy(buf,
inbuf.
get(bufu), bufu);
00355 }
00356
00357
TRACE(
"read obj 0x%p, bytes %d/%d\n",
this, bufu, count);
00358
return bufu;
00359 }
00360
00361
00362 size_t
WvStream::continue_read(time_t wait_msec,
void *buf, size_t count)
00363 {
00364 assert(
uses_continue_select);
00365
00366
if (!count)
00367
return 0;
00368
00369
00370
if (wait_msec >= 0)
00371
alarm(wait_msec);
00372
00373
queuemin(count);
00374
00375
int got = 0;
00376
00377
while (
isok())
00378 {
00379
if (
continue_select(-1))
00380 {
00381
if ((got =
read(buf, count)) != 0)
00382
break;
00383
if (
alarm_was_ticking)
00384
break;
00385 }
00386 }
00387
00388
if (wait_msec >= 0)
00389
alarm(-1);
00390
00391
queuemin(0);
00392
00393
return got;
00394 }
00395
00396
00397 size_t
WvStream::write(
const void *buf, size_t count)
00398 {
00399
if (!
isok() || !buf || !count)
return 0;
00400
00401 size_t wrote = 0;
00402
if (!
outbuf_delayed_flush && !
outbuf.
used())
00403 {
00404 wrote =
uwrite(buf, count);
00405 count -= wrote;
00406 buf = (
const unsigned char*)buf + wrote;
00407 }
00408
if (
max_outbuf_size != 0)
00409 {
00410 size_t canbuffer =
max_outbuf_size -
outbuf.
used();
00411
if (count > canbuffer)
00412 count = canbuffer;
00413 }
00414
if (count != 0)
00415 {
00416
outbuf.
put(buf, count);
00417 wrote += count;
00418 }
00419
00420
if (
should_flush())
00421 {
00422
if (
is_auto_flush)
00423
flush(0);
00424
else
00425
flush_outbuf(0);
00426 }
00427
00428
return wrote;
00429 }
00430
00431
00432 void WvStream::noread()
00433 {
00434
00435
00436 }
00437
00438
00439 void WvStream::nowrite()
00440 {
00441
if (
getwfd() < 0)
00442
return;
00443
00444
want_nowrite =
true;
00445 }
00446
00447
00448 bool WvStream::isreadable()
00449 {
00450
return isok() &&
select(0,
true,
false,
false);
00451 }
00452
00453
00454 bool WvStream::iswritable()
00455 {
00456
return isok() &&
select(0,
false,
true,
false);
00457 }
00458
00459
00460 char *
WvStream::getline(time_t wait_msec,
char separator,
int readahead)
00461 {
00462
struct timeval timeout_time;
00463
if (wait_msec > 0)
00464 timeout_time =
msecadd(
wvtime(), wait_msec);
00465
00466
00467
00468
while (
isok())
00469 {
00470
queuemin(0);
00471
00472
00473 size_t i =
inbuf.strchr(separator);
00474
if (i > 0)
00475 {
00476
char *eol = (
char *)
inbuf.
mutablepeek(i - 1, 1);
00477 assert(eol);
00478 *eol = 0;
00479
return (
char *)
inbuf.
get(i);
00480 }
00481
else if (!
isok())
00482 {
00483
if (
inbuf.
used())
00484 {
00485
00486
00487
00488
inbuf.
alloc(1)[0] = 0;
00489
return const_cast<char*>(
00490 (
const char*)
inbuf.
get(
inbuf.
used()));
00491 }
00492
else
00493
break;
00494 }
00495
00496
00497 size_t needed =
inbuf.
used() + 1;
00498
queuemin(needed);
00499
00500
00501
if (wait_msec > 0)
00502 {
00503 wait_msec =
msecdiff(timeout_time,
wvtime());
00504
if (wait_msec < 0)
00505 wait_msec = 0;
00506 }
00507
00508
bool hasdata;
00509
if (
uses_continue_select)
00510 hasdata =
continue_select(wait_msec);
00511
else
00512 hasdata =
select(wait_msec,
true,
false);
00513
if (!
isok())
00514
break;
00515
00516
if (hasdata)
00517 {
00518
00519
unsigned char *buf =
inbuf.
alloc(readahead);
00520 size_t len =
uread(buf, readahead);
00521
inbuf.
unalloc(readahead - len);
00522 hasdata =
inbuf.
used() >= needed;
00523 }
00524
00525
if (!hasdata && wait_msec == 0)
00526
break;
00527 }
00528
00529
00530
if (!
isok() &&
inbuf.
used())
00531 {
00532
00533
inbuf.
put(
"", 1);
00534
return (
char *)
inbuf.
get(
inbuf.
used());
00535 }
00536
else
00537
return NULL;
00538 }
00539
00540
00541 void WvStream::drain()
00542 {
00543
char buf[1024];
00544
while (
isreadable())
00545
read(buf,
sizeof(buf));
00546 }
00547
00548
00549 bool WvStream::flush(time_t msec_timeout)
00550 {
00551
if (
is_flushing)
return false;
00552
00553
TRACE(
"%p flush starts\n",
this);
00554
00555
is_flushing =
true;
00556
want_to_flush =
true;
00557
bool done =
flush_internal(msec_timeout)
00558 &&
flush_outbuf(msec_timeout);
00559
is_flushing =
false;
00560
00561
TRACE(
"flush stops (%d)\n", done);
00562
return done;
00563 }
00564
00565
00566 bool WvStream::should_flush()
00567 {
00568
return want_to_flush;
00569 }
00570
00571
00572 bool WvStream::flush_outbuf(time_t msec_timeout)
00573 {
00574
TRACE(
"%p flush_outbuf starts (isok=%d)\n",
this,
isok());
00575
00576
00577
while (
isok() &&
outbuf.
used())
00578 {
00579
00580
00581
00582 size_t attempt =
outbuf.
used();
00583 size_t real =
uwrite(
outbuf.
get(attempt), attempt);
00584
00585
00586
00587
00588
if (
isok() && real < attempt)
00589 {
00590
TRACE(
"flush_outbuf: unget %d-%d\n", attempt, real);
00591 assert(
outbuf.
ungettable() >= attempt - real);
00592
outbuf.
unget(attempt - real);
00593 }
00594
00595
00596
00597
if (!msec_timeout || !
select(msec_timeout,
false,
true))
00598 {
00599
if (msec_timeout >= 0)
00600
break;
00601 }
00602 }
00603
00604
00605
if (
isok() &&
autoclose_time)
00606 {
00607 time_t now = time(NULL);
00608
TRACE(
"Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
00609
this, now -
autoclose_time,
outbuf.
used());
00610
if ((
flush_internal(0) && !
outbuf.
used()) || now > autoclose_time)
00611 {
00612 autoclose_time = 0;
00613
close();
00614 }
00615 }
00616
00617
TRACE(
"flush_outbuf: after autoclose chunk\n");
00618
00619
if (!
outbuf.
used() &&
outbuf_delayed_flush)
00620
want_to_flush =
false;
00621
00622
TRACE(
"flush_outbuf: now isok=%d\n",
isok());
00623
00624
00625
if (!
isok())
00626
outbuf.
zap();
00627
00628
TRACE(
"flush_outbuf stops\n");
00629
00630
return !
outbuf.
used();
00631 }
00632
00633
00634 bool WvStream::flush_internal(time_t msec_timeout)
00635 {
00636
00637
return true;
00638 }
00639
00640
00641 int WvStream::getrfd()
const
00642
{
00643
return -1;
00644 }
00645
00646
00647 int WvStream::getwfd()
const
00648
{
00649
return -1;
00650 }
00651
00652
00653 void WvStream::flush_then_close(
int msec_timeout)
00654 {
00655 time_t now = time(NULL);
00656
autoclose_time = now + (msec_timeout + 999) / 1000;
00657
00658
TRACE(
"Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
00659
this,
outbuf.
used(),
autoclose_time - now);
00660
00661
00662
00663
00664
00665
00666
flush(0);
00667 }
00668
00669
00670 bool WvStream::pre_select(SelectInfo &si)
00671 {
00672 time_t alarmleft =
alarm_remaining();
00673
00674
if (alarmleft == 0)
00675
return true;
00676
00677
if (!si.inherit_request)
00678 si.wants |=
force;
00679
00680
00681
if (si.wants.readable &&
inbuf.
used() &&
inbuf.
used() >=
queue_min)
00682
return true;
00683
if (alarmleft >= 0
00684 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00685 si.msec_timeout = alarmleft;
00686
return false;
00687 }
00688
00689
00690 bool WvStream::post_select(SelectInfo &si)
00691 {
00692
00693
00694
00695
return false;
00696 }
00697
00698
00699 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00700
bool readable,
bool writable,
bool isexcept,
bool forceable)
00701 {
00702 FD_ZERO(&si.read);
00703 FD_ZERO(&si.write);
00704 FD_ZERO(&si.except);
00705
00706
if (forceable)
00707 si.wants =
force;
00708
else
00709 {
00710 si.wants.readable = readable;
00711 si.wants.writable = writable;
00712 si.wants.isexception = isexcept;
00713 }
00714
00715 si.max_fd = -1;
00716 si.msec_timeout = msec_timeout;
00717 si.inherit_request = ! forceable;
00718 si.global_sure =
false;
00719
00720
if (!
isok())
return false;
00721
00722
bool sure =
pre_select(si);
00723
if (
globalstream && forceable && (
globalstream !=
this))
00724 {
00725
WvStream *s =
globalstream;
00726 globalstream = NULL;
00727 si.global_sure = s->
pre_select(si);
00728 globalstream = s;
00729 }
00730
if (sure || si.global_sure)
00731 si.msec_timeout = 0;
00732
return sure;
00733 }
00734
00735
00736 int WvStream::_do_select(SelectInfo &si)
00737 {
00738
00739 timeval tv;
00740 tv.tv_sec = si.msec_timeout / 1000;
00741 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00742
00743
00744
int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00745 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00746
00747
00748
00749
00750
00751
00752
if (sel < 0
00753 && errno != EAGAIN && errno != EINTR
00754 && errno != EBADF
00755 && errno != ENOBUFS
00756
#ifdef _WIN32
00757
&& errno != WSAEINVAL
00758
#endif
00759
)
00760 {
00761
seterr(errno);
00762 }
00763
return sel;
00764 }
00765
00766
00767 bool WvStream::_process_selectinfo(SelectInfo &si,
bool forceable)
00768 {
00769
if (!
isok())
return false;
00770
00771
bool sure =
post_select(si);
00772
if (
globalstream && forceable && (
globalstream !=
this))
00773 {
00774
WvStream *s =
globalstream;
00775 globalstream = NULL;
00776 si.global_sure = s->
post_select(si) || si.global_sure;
00777 globalstream = s;
00778 }
00779
return sure;
00780 }
00781
00782
00783
bool WvStream::_select(time_t msec_timeout,
00784
bool readable,
bool writable,
bool isexcept,
bool forceable)
00785 {
00786 SelectInfo si;
00787
bool sure = _build_selectinfo(si, msec_timeout,
00788 readable, writable, isexcept, forceable);
00789
00790
if (!
isok())
00791
return false;
00792
00793
00794
00795
00796
00797
00798
00799
00800
00801
int sel = _do_select(si);
00802
if (sel >= 0)
00803 sure = _process_selectinfo(si, forceable) || sure;
00804
if (si.global_sure && globalstream && forceable && (globalstream !=
this))
00805 globalstream->
callback();
00806
return sure;
00807 }
00808
00809
00810 void WvStream::force_select(
bool readable,
bool writable,
bool isexception)
00811 {
00812
force.readable |= readable;
00813
force.writable |= writable;
00814
force.isexception |= isexception;
00815 }
00816
00817
00818 void WvStream::undo_force_select(
bool readable,
bool writable,
bool isexception)
00819 {
00820
force.readable &= !readable;
00821
force.writable &= !writable;
00822
force.isexception &= !isexception;
00823 }
00824
00825
00826 void WvStream::alarm(time_t msec_timeout)
00827 {
00828
if (msec_timeout >= 0)
00829
alarm_time =
msecadd(
wvtime(), msec_timeout);
00830
else
00831
alarm_time = wvtime_zero;
00832 }
00833
00834
00835 time_t
WvStream::alarm_remaining()
00836 {
00837
if (
alarm_time.tv_sec && !
running_callback)
00838 {
00839
WvTime now =
wvtime();
00840
00841
00842
if (now <
last_alarm_check)
00843
alarm_time =
tvdiff(
alarm_time,
tvdiff(
last_alarm_check, now));
00844
00845
last_alarm_check = now;
00846
00847 time_t remaining =
msecdiff(
alarm_time, now);
00848
if (remaining < 0)
00849 remaining = 0;
00850
return remaining;
00851 }
00852
return -1;
00853 }
00854
00855
00856 bool WvStream::continue_select(time_t msec_timeout)
00857 {
00858 assert(
uses_continue_select);
00859 assert(
task);
00860 assert(
taskman);
00861 assert(
taskman->
whoami() ==
task);
00862
00863
if (msec_timeout >= 0)
00864
alarm(msec_timeout);
00865
00866
running_callback =
false;
00867
taskman->
yield();
00868
running_callback =
true;
00869
alarm(-1);
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
TRACE(
"hello-%p\n",
this);
00881
return !
alarm_was_ticking ||
select(0);
00882 }
00883
00884
00885 void WvStream::terminate_continue_select()
00886 {
00887
close();
00888
if (
task)
00889 {
00890
while (
task->
isrunning())
00891
taskman->
run(*
task);
00892
task->
recycle();
00893
task = NULL;
00894 }
00895 }
00896
00897
00898 const WvAddr *
WvStream::src()
const
00899
{
00900
return NULL;
00901 }
00902
00903
00904 void WvStream::unread(
WvBuf &unreadbuf, size_t count)
00905 {
00906
WvDynBuf tmp;
00907 tmp.
merge(unreadbuf, count);
00908 tmp.
merge(
inbuf);
00909
inbuf.
zap();
00910
inbuf.
merge(tmp);
00911 }