wvhttpstream.cc

00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
00006  * 
00007  * See wvhttppool.h.
00008  */
00009 #include "wvhttppool.h"
00010 #include "wvtcp.h"
00011 #include "wvsslstream.h"
00012 #include "wvbuf.h"
00013 #include "wvbase64.h"
00014 #include "strutils.h"
00015 #ifndef _WIN32
00016 #include <execinfo.h> // FIXME: add a WvCrash feature for explicit dumps
00017 #endif
00018 
00019 #ifdef _WIN32
00020 #define ETIMEDOUT WSAETIMEDOUT
00021 #endif
00022 
00023 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
00024                 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
00025     : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
00026       pipeline_incompatible(_pipeline_incompatible), in_doneurl(false)
00027 {
00028     log("Opening server connection.\n");
00029     http_response = "";
00030     encoding = Unknown;
00031     bytes_remaining = 0;
00032     in_chunk_trailer = false;
00033     pipeline_test_count = 0;
00034     last_was_pipeline_test = false;
00035 
00036     enable_pipelining = global_enable_pipelining 
00037         && !pipeline_incompatible[target.remaddr];
00038     ssl = _ssl;
00039 
00040     if (ssl)
00041         cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
00042 
00043     sent_url_request = false;
00044 
00045     alarm(60000); // timeout if no connection, or something goes wrong
00046 }
00047 
00048 
00049 WvHttpStream::~WvHttpStream()
00050 {
00051     log(WvLog::Debug2, "Deleting.\n");
00052 
00053 #ifndef _WIN32
00054     void* trace[10];
00055     int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00056     char** tracedump = backtrace_symbols(trace, count);
00057     log(WvLog::Debug, "TRACE");
00058     for (int i = 0; i < count; ++i)
00059         log(WvLog::Debug, ":%s", tracedump[i]);
00060     log(WvLog::Debug, "\n");
00061     free(tracedump);
00062 #endif
00063 
00064     if (geterr())
00065         log("Error was: %s\n", errstr());
00066     close();
00067 }
00068 
00069 
00070 void WvHttpStream::close()
00071 {
00072     log("close called\n");
00073     
00074 #ifndef _WIN32
00075     void *trace[10];
00076     int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00077     char** tracedump = backtrace_symbols(trace, count);
00078     log(WvLog::Debug, "TRACE");
00079     for (int i = 0; i < count; ++i)
00080         log(WvLog::Debug, ":%s", tracedump[i]);
00081     log(WvLog::Debug, "\n");
00082     free(tracedump);
00083 #endif
00084 
00085     // assume pipelining is broken if we're closing without doing at least
00086     // one successful pipelining test and a following non-test request.
00087     if (enable_pipelining && max_requests > 1
00088             && (pipeline_test_count < 1
00089             || (pipeline_test_count == 1 && last_was_pipeline_test)))
00090         pipelining_is_broken(2);
00091 
00092     if (isok())
00093         log("Closing.\n");
00094     WvStreamClone::close();
00095 
00096     if (geterr())
00097     {
00098         // if there was an error, count the first URL as done.  This prevents
00099         // retrying indefinitely.
00100         WvUrlRequest *msgurl = curl;
00101         if (!msgurl && !urls.isempty())
00102             msgurl = urls.first();
00103         if (!msgurl && !waiting_urls.isempty())
00104             msgurl = waiting_urls.first();
00105         if (msgurl)
00106             log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
00107                 errstr());
00108     }
00109     waiting_urls.zap();
00110     if (curl)
00111     {
00112         log("curl is %s\n", curl->url);
00113         doneurl();
00114     }
00115     log("close done\n");
00116 }
00117 
00118 
00119 void WvHttpStream::doneurl()
00120 {
00121     // There is a slight chance that we might receive an error during or just before
00122     // this function is called, which means that the write occuring during
00123     // start_pipeline_test() would be called, which would call close() because of the
00124     // error, which would call doneurl() again.  We don't want to execute doneurl()
00125     // a second time when we're already in the middle.
00126     if (in_doneurl)
00127         return;
00128     in_doneurl = true;
00129 
00130     assert(curl != NULL);
00131     WvString last_response(http_response);
00132     log("Done URL: %s\n", curl->url);
00133 
00134     http_response = "";
00135     encoding = Unknown;
00136     in_chunk_trailer = false;
00137     bytes_remaining = 0;
00138 
00139     last_was_pipeline_test = curl->pipeline_test;
00140     bool broken = false;
00141     if (last_was_pipeline_test)
00142     {
00143         pipeline_test_count++;
00144         if (pipeline_test_count == 1)
00145             start_pipeline_test(&curl->url);
00146         else if (pipeline_test_response != last_response)
00147         {
00148             // getting a bit late in the game to be detecting brokenness :(
00149             // However, if the response code isn't the same for both tests,
00150             // something's definitely screwy.
00151             pipelining_is_broken(4);
00152             broken = true;
00153         }
00154         pipeline_test_response = last_response;
00155     }
00156 
00157     assert(curl == urls.first());
00158     curl->done();
00159     curl = NULL;
00160     sent_url_request = false;
00161     urls.unlink_first();
00162 
00163     if (broken)
00164         close();
00165 
00166     request_next();
00167     in_doneurl = false;
00168 }
00169 
00170 
00171 static WvString encode64(WvStringParm user, WvStringParm password)
00172 {
00173     WvBase64Encoder encoder;
00174     WvString ret;
00175     encoder.flushstrstr(WvString("%s:%s", user, password), ret);
00176     return ret;
00177 }
00178 
00179 
00180 static WvString fixnl(WvStringParm nonl)
00181 {
00182     WvDynBuf b;
00183     const char *cptr;
00184 
00185     for (cptr = nonl; cptr && *cptr; cptr++)
00186     {
00187         if (*cptr == '\r')
00188             continue;
00189         else if (*cptr == '\n')
00190             b.put("\r", 1); // put BOTH \r and \n
00191         b.put(cptr, 1);
00192     }
00193 
00194     return b.getstr();
00195 }
00196 
00197 
00198 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
00199 {
00200     WvString request;
00201     WvString auth(""), content = putstream_data.getstr();
00202     if(!!url->url.getuser() && !!url->url.getpassword())
00203         auth = WvString("Authorization: Basic %s\n",
00204                     encode64(url->url.getuser(), url->url.getpassword()));
00205 
00206     request = fixnl(WvString("%s %s HTTP/1.1\n"
00207                 "Host: %s:%s\n"
00208                 "Connection: %s\n"
00209                 "%s"
00210                 "%s"
00211                 "%s%s"
00212                 "\n"
00213                 "%s",
00214                 url->method,
00215                 url->url.getfile(),
00216                 url->url.gethost(), url->url.getport(),
00217                 keepalive ? "keep-alive" : "close",
00218                 auth,
00219                 (content.len() > 0 ? WvString("Content-Length: %s\n", content.len()).cstr() : ""),
00220                 trim_string(url->headers.edit()),
00221                 !!url->headers ? "\n" : "",
00222                 (content.len() > 0 ? content.cstr() : ""))
00223             );
00224     return request;
00225 }
00226 
00227 
00228 void WvHttpStream::send_request(WvUrlRequest *url)
00229 {
00230     request_count++;
00231     log("Request #%s: %s\n", request_count, url->url);
00232     write(request_str(url, url->pipeline_test
00233                 || request_count < max_requests));
00234     sent_url_request = true;
00235     alarm(60000);
00236 }
00237 
00238 
00239 void WvHttpStream::start_pipeline_test(WvUrl *url)
00240 {
00241     WvUrl location(WvString(
00242                 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
00243                 url->getproto(), url->gethost(), url->getport()));
00244     WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
00245                                              false, true);
00246     testurl->instream = this;
00247     send_request(testurl);
00248     urls.append(testurl, true, "sent_running_url");
00249 }
00250 
00251 
00252 void WvHttpStream::request_next()
00253 {
00254     // Clear the putstream buffer before we start any new requests
00255     putstream_data.zap();
00256 
00257     // don't do a request if we've done too many already or we have none
00258     // waiting.
00259     if (request_count >= max_requests || waiting_urls.isempty())
00260         return;
00261 
00262     // don't do more than one request at a time if we're not pipelining.
00263     if (!enable_pipelining && !urls.isempty())
00264         return;
00265 
00266     // okay then, we really do want to send a new request.
00267     WvUrlRequest *url = waiting_urls.first();
00268 
00269     waiting_urls.unlink_first();
00270     if (!url->putstream)
00271     {
00272         if (enable_pipelining && !request_count && max_requests > 1)
00273             start_pipeline_test(&url->url);
00274         send_request(url);
00275     }
00276     urls.append(url, false, "sent_running_url");
00277 }
00278 
00279 
00280 void WvHttpStream::pipelining_is_broken(int why)
00281 {
00282     if (!pipeline_incompatible[target.remaddr])
00283     {
00284         pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
00285         log("Pipelining is broken on this server (%s)!  Disabling.\n", why);
00286     }
00287 }
00288 
00289 
00290 bool WvHttpStream::pre_select(SelectInfo &si)
00291 {
00292     SelectRequest oldwant = si.wants;
00293     WvUrlRequest *url;
00294 
00295     if (WvUrlStream::pre_select(si))
00296         return true;
00297 
00298     if (!urls.isempty())
00299     {
00300         url = urls.first();
00301         if(url && url->putstream && url->putstream->pre_select(si))
00302             return true;
00303     }
00304    
00305     si.wants = oldwant;
00306     return false;
00307 }
00308 
00309 
00310 bool WvHttpStream::post_select(SelectInfo &si)
00311 {
00312     SelectRequest oldwant = si.wants;
00313     WvUrlRequest *url;
00314 
00315     if (WvUrlStream::post_select(si))
00316         return true;
00317 
00318     if (!urls.isempty())
00319     {
00320         url = urls.first();
00321         if(url && url->putstream && url->putstream->post_select(si))
00322             return true;
00323     }
00324 
00325     si.wants = oldwant;
00326     return false;
00327 }
00328 
00329 
00330 void WvHttpStream::execute()
00331 {
00332     char buf[1024], *line;
00333     size_t len;
00334 
00335     WvStreamClone::execute();
00336 
00337     // make connections timeout after some idleness
00338     if (alarm_was_ticking)
00339     {
00340         log(WvLog::Debug4, "urls count: %s\n", urls.count());
00341         if (!urls.isempty())
00342         {
00343             seterr(ETIMEDOUT);
00344 
00345             // Must check again here since seterr()
00346             // will close our stream and if we only 
00347             // had one url then it'll be gone.
00348             if (!urls.isempty())
00349             {
00350                 WvUrlRequest *url = urls.first();
00351                 if (url->outstream)
00352                     url->outstream->seterr(ETIMEDOUT);
00353             }
00354         }
00355         else
00356             close(); // timed out, but not really an error
00357         return;
00358     }
00359 
00360     // Die if somebody closed our outstream.  This is so that if we were
00361     // downloading a really big file, they can stop it in the middle and
00362     // our next url request can start downloading immediately.
00363     if (curl && !curl->outstream)
00364     {
00365         if (!(encoding == PostHeadInfinity
00366               || encoding == PostHeadChunked
00367               || encoding == PostHeadStream))
00368         {
00369             // don't complain about pipelining failures
00370             pipeline_test_count++;
00371             last_was_pipeline_test = false;
00372             close();
00373         }
00374 
00375         if (curl)
00376             doneurl();
00377         return;
00378     }
00379     else if (curl)
00380         curl->inuse = true;
00381 
00382     if(!sent_url_request && !urls.isempty())
00383     {
00384         WvUrlRequest *url = urls.first();
00385         if(url)
00386         {
00387             if(url->putstream)
00388             {
00389                 int len = 0;
00390                 if(url->putstream->isok())
00391                     len = url->putstream->read(putstream_data, 1024);
00392 
00393                 if(!url->putstream->isok() || len == 0)
00394                 {
00395                     url->putstream = NULL;
00396                     send_request(url);
00397                 }
00398             }
00399         }
00400     }
00401 
00402     if (!curl)
00403     {
00404         // in the header section
00405         line = getline();
00406         if (line)
00407         {
00408             line = trim_string(line);
00409             log(WvLog::Debug4, "Header: '%s'\n", line);
00410             if (!http_response)
00411             {
00412                 http_response = line;
00413 
00414                 // there are never two pipeline test requests in a row, so
00415                 // a second response string exactly like the pipeline test
00416                 // response implies that everything between the first and
00417                 // second test requests was lost: bad!
00418                 if (last_was_pipeline_test
00419                         && http_response == pipeline_test_response)
00420                 {
00421                     pipelining_is_broken(1);
00422                     close();
00423                     return;
00424                 }
00425 
00426                 // http response #400 is "invalid request", which we
00427                 // shouldn't be sending. If we get one of these right after
00428                 // a test, it probably means the stuff that came after it
00429                 // was mangled in some way during transmission ...and we
00430                 // should throw it away.
00431                 if (last_was_pipeline_test && !!http_response)
00432                 {
00433                     const char *cptr = strchr(http_response, ' ');
00434                     if (cptr && atoi(cptr+1) == 400)
00435                     {
00436                         pipelining_is_broken(3);
00437                         close();
00438                         return;
00439                     }
00440                 }
00441             }
00442 
00443             if (urls.isempty())
00444             {
00445                 log("got unsolicited data.\n");
00446                 seterr("unsolicited data from server!");
00447                 return;
00448             }
00449 
00450             if (!strncasecmp(line, "Content-length: ", 16))
00451             {
00452                 bytes_remaining = atoi(line+16);
00453                 encoding = ContentLength;
00454             }
00455             else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
00456                     && strstr(line+19, "chunked"))
00457             {
00458                 encoding = Chunked;
00459             }
00460 
00461             if (line[0])
00462             {
00463                 char *p;
00464                 WvBufUrlStream *outstream = urls.first()->outstream;
00465 
00466                 if ((p = strchr(line, ':')) != NULL)
00467                 {
00468                     *p = 0;
00469                     p = trim_string(p+1);
00470                     if (outstream) {
00471                         struct WvHTTPHeader *h;
00472                         h = new struct WvHTTPHeader(line, p);
00473                         outstream->headers.add(h, true);
00474                     }
00475                 }
00476                 else if (strncasecmp(line, "HTTP/", 5) == 0)
00477                 {
00478                     char *p = strchr(line, ' ');
00479                     if (p)
00480                     {
00481                         *p = 0;
00482                         if (outstream)
00483                         {
00484                             outstream->version = line+5;
00485                             outstream->status = atoi(p+1);
00486                         }
00487                     }
00488                 }
00489             }
00490             else
00491             {
00492                 // blank line is the beginning of data section
00493                 curl = urls.first();
00494                 in_chunk_trailer = false;
00495                 log(WvLog::Debug4,
00496                         "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
00497 
00498                 if (encoding == Unknown)
00499                     encoding = Infinity; // go until connection closes itself
00500 
00501                 if (curl->method == "HEAD")
00502                 {
00503                     log("Got all headers.\n");
00504                     if (!enable_pipelining)
00505                         doneurl();
00506 
00507                     if (encoding == Infinity)
00508                         encoding = PostHeadInfinity;
00509                     else if (encoding == Chunked)
00510                         encoding = PostHeadChunked;
00511                     else
00512                         encoding = PostHeadStream;
00513                 }
00514             }
00515         }
00516     }
00517     else if (encoding == PostHeadInfinity
00518              || encoding == PostHeadChunked
00519              || encoding == PostHeadStream)
00520     {
00521         WvDynBuf chkbuf;
00522         len = read(chkbuf, 5);
00523 
00524         // If there is more data available right away, and it isn't an
00525         // HTTP header from another request, then it's a stupid web
00526         // server that likes to send bodies with HEAD requests.
00527         if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
00528                            "HTTP/", 5))
00529         {
00530             if (encoding == PostHeadInfinity)
00531                 encoding = ChuckInfinity;
00532             else if (encoding == PostHeadChunked)
00533                 encoding = ChuckChunked;
00534             else if (encoding == PostHeadStream)
00535                 encoding = ChuckStream;
00536             else
00537                 log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
00538         }
00539         else
00540             doneurl();
00541 
00542         unread(chkbuf, len);
00543     }
00544     else if (encoding == ChuckInfinity)
00545     {
00546         len = read(buf, sizeof(buf));
00547         if (len)
00548             log(WvLog::Debug5, "Chucking %s bytes.\n", len);
00549         if (!isok())
00550             doneurl();
00551     }
00552     else if (encoding == ChuckChunked && !bytes_remaining)
00553     {
00554         encoding = Chunked;
00555     }
00556     else if (encoding == ChuckChunked || encoding == ChuckStream)
00557     {
00558         if (bytes_remaining > sizeof(buf))
00559             len = read(buf, sizeof(buf));
00560         else
00561             len = read(buf, bytes_remaining);
00562         bytes_remaining -= len;
00563         if (len)
00564             log(WvLog::Debug5,
00565                 "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
00566         if (!bytes_remaining && encoding == ContentLength)
00567             doneurl();
00568         if (bytes_remaining && !isok())
00569             seterr("connection interrupted");
00570     }
00571     else if (encoding == Chunked && !bytes_remaining)
00572     {
00573         line = getline();
00574         if (line)
00575         {
00576             line = trim_string(line);
00577 
00578             if (in_chunk_trailer)
00579             {
00580                 // in the trailer section of a chunked encoding
00581                 log(WvLog::Debug4, "Trailer: '%s'\n", line);
00582 
00583                 // a blank line means we're finally done!
00584                 if (!line[0])
00585                     doneurl();
00586             }
00587             else
00588             {
00589                 // in the "length line" section of a chunked encoding
00590                 if (line[0])
00591                 {
00592                     bytes_remaining = (size_t)strtoul(line, NULL, 16);
00593                     if (!bytes_remaining)
00594                         in_chunk_trailer = true;
00595                     log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
00596                             bytes_remaining, line);
00597                 }
00598             }
00599         }
00600     }
00601     else if (encoding == Infinity)
00602     {
00603         // just read data until the connection closes, and assume all was
00604         // well.  It sucks, but there's no way to tell if all the data arrived
00605         // okay... that's why Chunked or ContentLength encoding is better.
00606         len = read(buf, sizeof(buf));
00607         if (!isok())
00608             return;
00609 
00610         if (len)
00611             log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
00612         if (curl && curl->outstream)
00613             curl->outstream->write(buf, len);
00614 
00615         if (!isok() && curl)
00616             doneurl();
00617     }
00618     else // not chunked or currently in a chunk - read 'bytes_remaining' bytes.
00619     {
00620         // in the data section of a chunked or content-length encoding,
00621         // with 'bytes_remaining' bytes of data left.
00622 
00623         if (bytes_remaining > sizeof(buf))
00624             len = read(buf, sizeof(buf));
00625         else
00626             len = read(buf, bytes_remaining);
00627         if (!isok())
00628             return;
00629 
00630         bytes_remaining -= len;
00631         if (len)
00632             log(WvLog::Debug5, 
00633                     "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
00634         if (curl && curl->outstream)
00635             curl->outstream->write(buf, len);
00636 
00637         if (!bytes_remaining && encoding == ContentLength && curl)
00638             doneurl();
00639 
00640         if (bytes_remaining && !isok())
00641             seterr("connection interrupted");
00642 
00643         if (!isok())
00644             doneurl();
00645     }
00646 
00647     if (urls.isempty())
00648         alarm(5000); // just wait a few seconds before closing connection
00649     else
00650         alarm(60000); // give the server a minute to respond, if we're waiting
00651 }

Generated on Thu May 25 21:51:03 2006 for WvStreams by  doxygen 1.4.6