wvhttpstream.cc

00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
00006  * 
00007  * See wvhttppool.h.
00008  */
00009 #include "wvhttppool.h"
00010 #include "wvtcp.h"
00011 #include "wvsslstream.h"
00012 #include "wvbuf.h"
00013 #include "wvbase64.h"
00014 #include "strutils.h"
00015 #ifdef HAVE_EXECINFO_H
00016 #include <execinfo.h> // FIXME: add a WvCrash feature for explicit dumps
00017 #endif
00018 
00019 #ifdef _WIN32
00020 #define ETIMEDOUT WSAETIMEDOUT
00021 #endif
00022 
00023 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
00024                 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
00025     : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
00026       pipeline_incompatible(_pipeline_incompatible),
00027       in_doneurl(false)
00028 {
00029     log("Opening server connection.\n");
00030     http_response = "";
00031     encoding = Unknown;
00032     bytes_remaining = 0;
00033     in_chunk_trailer = false;
00034     pipeline_test_count = 0;
00035     last_was_pipeline_test = false;
00036 
00037     enable_pipelining = global_enable_pipelining 
00038         && !pipeline_incompatible[target.remaddr];
00039     ssl = _ssl;
00040 
00041     if (ssl)
00042         cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
00043 
00044     sent_url_request = false;
00045 
00046     alarm(60000); // timeout if no connection, or something goes wrong
00047 }
00048 
00049 
00050 WvHttpStream::~WvHttpStream()
00051 {
00052     log(WvLog::Debug2, "Deleting.\n");
00053 
00054 #if 0
00055 #ifdef HAVE_EXECINFO_H
00056     void* trace[10];
00057     int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00058     char** tracedump = backtrace_symbols(trace, count);
00059     log(WvLog::Debug, "TRACE");
00060     for (int i = 0; i < count; ++i)
00061         log(WvLog::Debug, ":%s", tracedump[i]);
00062     log(WvLog::Debug, "\n");
00063     free(tracedump);
00064 #endif
00065 #endif
00066 
00067     if (geterr())
00068         log("Error was: %s\n", errstr());
00069     close();
00070 }
00071 
00072 
00073 void WvHttpStream::close()
00074 {
00075     log("close called\n");
00076 
00077 #if 0    
00078 #ifdef HAVE_EXECINFO_H
00079     void *trace[10];
00080     int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00081     char** tracedump = backtrace_symbols(trace, count);
00082     log(WvLog::Debug, "TRACE");
00083     for (int i = 0; i < count; ++i)
00084         log(WvLog::Debug, ":%s", tracedump[i]);
00085     log(WvLog::Debug, "\n");
00086     free(tracedump);
00087 #endif
00088 #endif
00089 
00090     // assume pipelining is broken if we're closing without doing at least
00091     // one successful pipelining test and a following non-test request.
00092     if (enable_pipelining && max_requests > 1
00093             && (pipeline_test_count < 1
00094             || (pipeline_test_count == 1 && last_was_pipeline_test)))
00095         pipelining_is_broken(2);
00096 
00097     if (isok())
00098         log("Closing.\n");
00099     WvStreamClone::close();
00100 
00101     if (geterr())
00102     {
00103         // if there was an error, count the first URL as done.  This prevents
00104         // retrying indefinitely.
00105         WvUrlRequest *msgurl = curl;
00106         if (!msgurl && !urls.isempty())
00107             msgurl = urls.first();
00108         if (!msgurl && !waiting_urls.isempty())
00109             msgurl = waiting_urls.first();
00110         if (msgurl)
00111             log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
00112                 errstr());
00113     }
00114     waiting_urls.zap();
00115     if (curl)
00116     {
00117         log("curl is %s\n", curl->url);
00118         doneurl();
00119     }
00120     log("close done\n");
00121 }
00122 
00123 
00124 void WvHttpStream::doneurl()
00125 {
00126     // There is a slight chance that we might receive an error during or just before
00127     // this function is called, which means that the write occuring during
00128     // start_pipeline_test() would be called, which would call close() because of the
00129     // error, which would call doneurl() again.  We don't want to execute doneurl()
00130     // a second time when we're already in the middle.
00131     if (in_doneurl)
00132         return;
00133     in_doneurl = true;
00134 
00135     assert(curl != NULL);
00136     WvString last_response(http_response);
00137     log("Done URL: %s\n", curl->url);
00138 
00139     http_response = "";
00140     encoding = Unknown;
00141     in_chunk_trailer = false;
00142     bytes_remaining = 0;
00143 
00144     last_was_pipeline_test = curl->pipeline_test;
00145     bool broken = false;
00146     if (last_was_pipeline_test)
00147     {
00148         pipeline_test_count++;
00149         if (pipeline_test_count == 1)
00150             start_pipeline_test(&curl->url);
00151         else if (pipeline_test_response != last_response)
00152         {
00153             // getting a bit late in the game to be detecting brokenness :(
00154             // However, if the response code isn't the same for both tests,
00155             // something's definitely screwy.
00156             pipelining_is_broken(4);
00157             broken = true;
00158         }
00159         pipeline_test_response = last_response;
00160     }
00161 
00162     assert(curl == urls.first());
00163     curl->done();
00164     curl = NULL;
00165     sent_url_request = false;
00166     urls.unlink_first();
00167 
00168     if (broken)
00169         close();
00170 
00171     request_next();
00172     in_doneurl = false;
00173 }
00174 
00175 
00176 static WvString encode64(WvStringParm user, WvStringParm password)
00177 {
00178     WvBase64Encoder encoder;
00179     WvString ret;
00180     encoder.flushstrstr(WvString("%s:%s", user, password), ret);
00181     return ret;
00182 }
00183 
00184 
00185 static WvString fixnl(WvStringParm nonl)
00186 {
00187     WvDynBuf b;
00188     const char *cptr;
00189 
00190     for (cptr = nonl; cptr && *cptr; cptr++)
00191     {
00192         if (*cptr == '\r')
00193             continue;
00194         else if (*cptr == '\n')
00195             b.put("\r", 1); // put BOTH \r and \n
00196         b.put(cptr, 1);
00197     }
00198 
00199     return b.getstr();
00200 }
00201 
00202 
00203 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
00204 {
00205     WvString request;
00206     WvString auth(""), content = putstream_data.getstr();
00207     if(!!url->url.getuser() && !!url->url.getpassword())
00208         auth = WvString("Authorization: Basic %s\n",
00209                     encode64(url->url.getuser(), url->url.getpassword()));
00210 
00211     request = fixnl(WvString("%s %s HTTP/1.1\n"
00212                 "Host: %s:%s\n"
00213                 "Connection: %s\n"
00214                 "%s"
00215                 "%s"
00216                 "%s%s"
00217                 "\n"
00218                 "%s",
00219                 url->method,
00220                 url->url.getfile(),
00221                 url->url.gethost(), url->url.getport(),
00222                 keepalive ? "keep-alive" : "close",
00223                 auth,
00224                 (content.len() > 0 ? WvString("Content-Length: %s\n", content.len()).cstr() : ""),
00225                 trim_string(url->headers.edit()),
00226                 !!url->headers ? "\n" : "",
00227                 (content.len() > 0 ? content.cstr() : ""))
00228             );
00229     return request;
00230 }
00231 
00232 
00233 void WvHttpStream::send_request(WvUrlRequest *url)
00234 {
00235     request_count++;
00236     log("Request #%s: %s\n", request_count, url->url);
00237     write(request_str(url, url->pipeline_test
00238                 || request_count < max_requests));
00239     sent_url_request = true;
00240     alarm(60000);
00241 }
00242 
00243 
00244 void WvHttpStream::start_pipeline_test(WvUrl *url)
00245 {
00246     WvUrl location(WvString(
00247                 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
00248                 url->getproto(), url->gethost(), url->getport()));
00249     WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
00250                                              false, true);
00251     testurl->instream = this;
00252     send_request(testurl);
00253     urls.append(testurl, true, "sent_running_url");
00254 }
00255 
00256 
00257 void WvHttpStream::request_next()
00258 {
00259     // Clear the putstream buffer before we start any new requests
00260     putstream_data.zap();
00261 
00262     // don't do a request if we've done too many already or we have none
00263     // waiting.
00264     if (request_count >= max_requests || waiting_urls.isempty())
00265         return;
00266 
00267     // don't do more than one request at a time if we're not pipelining.
00268     if (!enable_pipelining && !urls.isempty())
00269         return;
00270 
00271     // okay then, we really do want to send a new request.
00272     WvUrlRequest *url = waiting_urls.first();
00273 
00274     waiting_urls.unlink_first();
00275     if (!url->putstream)
00276     {
00277         if (enable_pipelining && !request_count && max_requests > 1)
00278             start_pipeline_test(&url->url);
00279         send_request(url);
00280     }
00281     urls.append(url, false, "sent_running_url");
00282 }
00283 
00284 
00285 void WvHttpStream::pipelining_is_broken(int why)
00286 {
00287     if (!pipeline_incompatible[target.remaddr])
00288     {
00289         pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
00290         log("Pipelining is broken on this server (%s)!  Disabling.\n", why);
00291     }
00292 }
00293 
00294 
00295 void WvHttpStream::pre_select(SelectInfo &si)
00296 {
00297     SelectRequest oldwant = si.wants;
00298     WvUrlRequest *url;
00299 
00300     WvUrlStream::pre_select(si);
00301 
00302     if (!urls.isempty())
00303     {
00304         url = urls.first();
00305         if(url && url->putstream) 
00306             url->putstream->pre_select(si);
00307     }
00308    
00309     si.wants = oldwant;
00310 }
00311 
00312 
00313 bool WvHttpStream::post_select(SelectInfo &si)
00314 {
00315     SelectRequest oldwant = si.wants;
00316     WvUrlRequest *url;
00317 
00318     if (WvUrlStream::post_select(si))
00319         return true;
00320 
00321     if (!urls.isempty())
00322     {
00323         url = urls.first();
00324         if(url && url->putstream && url->putstream->post_select(si))
00325             return true;
00326     }
00327 
00328     si.wants = oldwant;
00329     return false;
00330 }
00331 
00332 
00333 void WvHttpStream::execute()
00334 {
00335     char buf[1024], *line;
00336     size_t len;
00337 
00338     WvStreamClone::execute();
00339 
00340     // make connections timeout after some idleness
00341     if (alarm_was_ticking)
00342     {
00343         log(WvLog::Debug4, "urls count: %s\n", urls.count());
00344         if (!urls.isempty())
00345         {
00346             seterr(ETIMEDOUT);
00347 
00348             // Must check again here since seterr()
00349             // will close our stream and if we only 
00350             // had one url then it'll be gone.
00351             if (!urls.isempty())
00352             {
00353                 WvUrlRequest *url = urls.first();
00354                 if (url->outstream)
00355                     url->outstream->seterr(ETIMEDOUT);
00356             }
00357         }
00358         else
00359             close(); // timed out, but not really an error
00360         return;
00361     }
00362 
00363     // Die if somebody closed our outstream.  This is so that if we were
00364     // downloading a really big file, they can stop it in the middle and
00365     // our next url request can start downloading immediately.
00366     if (curl && !curl->outstream)
00367     {
00368         if (!(encoding == PostHeadInfinity
00369               || encoding == PostHeadChunked
00370               || encoding == PostHeadStream))
00371         {
00372             // don't complain about pipelining failures
00373             pipeline_test_count++;
00374             last_was_pipeline_test = false;
00375             close();
00376         }
00377 
00378         if (curl)
00379             doneurl();
00380         return;
00381     }
00382     else if (curl)
00383         curl->inuse = true;
00384 
00385     if(!sent_url_request && !urls.isempty())
00386     {
00387         WvUrlRequest *url = urls.first();
00388         if(url)
00389         {
00390             if(url->putstream)
00391             {
00392                 int len = 0;
00393                 if(url->putstream->isok())
00394                     len = url->putstream->read(putstream_data, 1024);
00395 
00396                 if(!url->putstream->isok() || len == 0)
00397                 {
00398                     url->putstream = NULL;
00399                     send_request(url);
00400                 }
00401             }
00402         }
00403     }
00404 
00405     if (!curl)
00406     {
00407         // in the header section
00408         line = getline();
00409         if (line)
00410         {
00411             line = trim_string(line);
00412             log(WvLog::Debug4, "Header: '%s'\n", line);
00413             if (!http_response)
00414             {
00415                 http_response = line;
00416 
00417                 // there are never two pipeline test requests in a row, so
00418                 // a second response string exactly like the pipeline test
00419                 // response implies that everything between the first and
00420                 // second test requests was lost: bad!
00421                 if (last_was_pipeline_test
00422                         && http_response == pipeline_test_response)
00423                 {
00424                     pipelining_is_broken(1);
00425                     close();
00426                     return;
00427                 }
00428 
00429                 // http response #400 is "invalid request", which we
00430                 // shouldn't be sending. If we get one of these right after
00431                 // a test, it probably means the stuff that came after it
00432                 // was mangled in some way during transmission ...and we
00433                 // should throw it away.
00434                 if (last_was_pipeline_test && !!http_response)
00435                 {
00436                     const char *cptr = strchr(http_response, ' ');
00437                     if (cptr && atoi(cptr+1) == 400)
00438                     {
00439                         pipelining_is_broken(3);
00440                         close();
00441                         return;
00442                     }
00443                 }
00444             }
00445 
00446             if (urls.isempty())
00447             {
00448                 log("got unsolicited data.\n");
00449                 seterr("unsolicited data from server!");
00450                 return;
00451             }
00452 
00453             if (!strncasecmp(line, "Content-length: ", 16))
00454             {
00455                 bytes_remaining = atoi(line+16);
00456                 encoding = ContentLength;
00457             }
00458             else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
00459                     && strstr(line+19, "chunked"))
00460             {
00461                 encoding = Chunked;
00462             }
00463 
00464             if (line[0])
00465             {
00466                 char *p;
00467                 WvBufUrlStream *outstream = urls.first()->outstream;
00468 
00469                 if ((p = strchr(line, ':')) != NULL)
00470                 {
00471                     *p = 0;
00472                     p = trim_string(p+1);
00473                     if (outstream) {
00474                         struct WvHTTPHeader *h;
00475                         h = new struct WvHTTPHeader(line, p);
00476                         outstream->headers.add(h, true);
00477                     }
00478                 }
00479                 else if (strncasecmp(line, "HTTP/", 5) == 0)
00480                 {
00481                     char *p = strchr(line, ' ');
00482                     if (p)
00483                     {
00484                         *p = 0;
00485                         if (outstream)
00486                         {
00487                             outstream->version = line+5;
00488                             outstream->status = atoi(p+1);
00489                         }
00490                     }
00491                 }
00492             }
00493             else
00494             {
00495                 // blank line is the beginning of data section
00496                 curl = urls.first();
00497                 in_chunk_trailer = false;
00498                 log(WvLog::Debug4,
00499                         "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
00500 
00501                 if (encoding == Unknown)
00502                     encoding = Infinity; // go until connection closes itself
00503 
00504                 if (curl->method == "HEAD")
00505                 {
00506                     log("Got all headers.\n");
00507                     if (!enable_pipelining)
00508                         doneurl();
00509 
00510                     if (encoding == Infinity)
00511                         encoding = PostHeadInfinity;
00512                     else if (encoding == Chunked)
00513                         encoding = PostHeadChunked;
00514                     else
00515                         encoding = PostHeadStream;
00516                 }
00517             }
00518         }
00519     }
00520     else if (encoding == PostHeadInfinity
00521              || encoding == PostHeadChunked
00522              || encoding == PostHeadStream)
00523     {
00524         WvDynBuf chkbuf;
00525         len = read(chkbuf, 5);
00526 
00527         // If there is more data available right away, and it isn't an
00528         // HTTP header from another request, then it's a stupid web
00529         // server that likes to send bodies with HEAD requests.
00530         if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
00531                            "HTTP/", 5))
00532         {
00533             if (encoding == PostHeadInfinity)
00534                 encoding = ChuckInfinity;
00535             else if (encoding == PostHeadChunked)
00536                 encoding = ChuckChunked;
00537             else if (encoding == PostHeadStream)
00538                 encoding = ChuckStream;
00539             else
00540                 log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
00541         }
00542         else
00543             doneurl();
00544 
00545         unread(chkbuf, len);
00546     }
00547     else if (encoding == ChuckInfinity)
00548     {
00549         len = read(buf, sizeof(buf));
00550         if (len)
00551             log(WvLog::Debug5, "Chucking %s bytes.\n", len);
00552         if (!isok())
00553             doneurl();
00554     }
00555     else if (encoding == ChuckChunked && !bytes_remaining)
00556     {
00557         encoding = Chunked;
00558     }
00559     else if (encoding == ChuckChunked || encoding == ChuckStream)
00560     {
00561         if (bytes_remaining > sizeof(buf))
00562             len = read(buf, sizeof(buf));
00563         else
00564             len = read(buf, bytes_remaining);
00565         bytes_remaining -= len;
00566         if (len)
00567             log(WvLog::Debug5,
00568                 "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
00569         if (!bytes_remaining && encoding == ContentLength)
00570             doneurl();
00571         if (bytes_remaining && !isok())
00572             seterr("connection interrupted");
00573     }
00574     else if (encoding == Chunked && !bytes_remaining)
00575     {
00576         line = getline();
00577         if (line)
00578         {
00579             line = trim_string(line);
00580 
00581             if (in_chunk_trailer)
00582             {
00583                 // in the trailer section of a chunked encoding
00584                 log(WvLog::Debug4, "Trailer: '%s'\n", line);
00585 
00586                 // a blank line means we're finally done!
00587                 if (!line[0])
00588                     doneurl();
00589             }
00590             else
00591             {
00592                 // in the "length line" section of a chunked encoding
00593                 if (line[0])
00594                 {
00595                     bytes_remaining = (size_t)strtoul(line, NULL, 16);
00596                     if (!bytes_remaining)
00597                         in_chunk_trailer = true;
00598                     log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
00599                             bytes_remaining, line);
00600                 }
00601             }
00602         }
00603     }
00604     else if (encoding == Infinity)
00605     {
00606         // just read data until the connection closes, and assume all was
00607         // well.  It sucks, but there's no way to tell if all the data arrived
00608         // okay... that's why Chunked or ContentLength encoding is better.
00609         len = read(buf, sizeof(buf));
00610         if (!isok())
00611             return;
00612 
00613         if (len)
00614             log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
00615         if (curl && curl->outstream)
00616             curl->outstream->write(buf, len);
00617 
00618         if (!isok() && curl)
00619             doneurl();
00620     }
00621     else // not chunked or currently in a chunk - read 'bytes_remaining' bytes.
00622     {
00623         // in the data section of a chunked or content-length encoding,
00624         // with 'bytes_remaining' bytes of data left.
00625 
00626         if (bytes_remaining > sizeof(buf))
00627             len = read(buf, sizeof(buf));
00628         else
00629             len = read(buf, bytes_remaining);
00630         if (!isok())
00631             return;
00632 
00633         bytes_remaining -= len;
00634         if (len)
00635             log(WvLog::Debug5, 
00636                     "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
00637         if (curl && curl->outstream)
00638             curl->outstream->write(buf, len);
00639 
00640         if (!bytes_remaining && encoding == ContentLength && curl)
00641             doneurl();
00642 
00643         if (bytes_remaining && !isok())
00644             seterr("connection interrupted");
00645 
00646         if (!isok())
00647             doneurl();
00648     }
00649 
00650     if (urls.isempty())
00651         alarm(5000); // just wait a few seconds before closing connection
00652     else
00653         alarm(60000); // give the server a minute to respond, if we're waiting
00654 }

Generated on Thu Jan 24 16:50:56 2008 for WvStreams by  doxygen 1.5.4