Gearman Developer Documentation

libgearman-server/protocol_http.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 #include "gearmand.h"
00016 
00017 #include <libgearman-server/protocol_http.h>
00018 
00028 #define GEARMAN_PROTOCOL_HTTP_DEFAULT_PORT 8080
00029 
00033 typedef struct
00034 {
00035   bool background;
00036   bool keep_alive;
00037 } gearman_protocol_http_st;
00038 
00039 /* Protocol callback functions. */
00040 static gearman_return_t _http_con_add(gearman_connection_st *connection);
00041 static void _http_free(gearman_connection_st *connection, void *context);
00042 static size_t _http_pack(const gearman_packet_st *packet, gearman_connection_st *connection,
00043                          void *data, size_t data_size,
00044                          gearman_return_t *ret_ptr);
00045 static size_t _http_unpack(gearman_packet_st *packet, gearman_connection_st *connection,
00046                            const void *data, size_t data_size,
00047                            gearman_return_t *ret_ptr);
00048 
00049 /* Line parsing helper function. */
00050 static const char *_http_line(const void *data, size_t data_size,
00051                               size_t *line_size, size_t *offset);
00052 
00055 /*
00056  * Public definitions
00057  */
00058 
00059 gearman_return_t gearmand_protocol_http_conf(gearman_conf_st *conf)
00060 {
00061   gearman_conf_module_st *module;
00062 
00063   module= gearman_conf_module_create(conf, NULL, "http");
00064   if (module == NULL)
00065     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00066 
00067   gearman_conf_module_add_option(module, "port", 0, "PORT",
00068                                  "Port to listen on.");
00069 
00070   return gearman_conf_return(conf);
00071 }
00072 
00073 gearman_return_t gearmand_protocol_http_init(gearmand_st *gearmand,
00074                                              gearman_conf_st *conf)
00075 {
00076   in_port_t port= GEARMAN_PROTOCOL_HTTP_DEFAULT_PORT;
00077   gearman_conf_module_st *module;
00078   const char *name;
00079   const char *value;
00080 
00081   gearmand_log_info(gearmand, "Initializing http module");
00082 
00083   /* Get module and parse the option values that were given. */
00084   module= gearman_conf_module_find(conf, "http");
00085   if (module == NULL)
00086   {
00087     gearmand_log_fatal(gearmand, "gearman_protocol_http_init:gearman_conf_module_find:NULL");
00088     return GEARMAN_QUEUE_ERROR;
00089   }
00090 
00091   while (gearman_conf_module_value(module, &name, &value))
00092   {
00093     if (!strcmp(name, "port"))
00094       port= (in_port_t)atoi(value);
00095     else
00096     {
00097       gearmand_protocol_http_deinit(gearmand);
00098       gearmand_log_fatal(gearmand, "gearman_protocol_http_init:Unknown argument: %s", name);
00099       return GEARMAN_QUEUE_ERROR;
00100     }
00101   }
00102 
00103   return gearmand_port_add(gearmand, port, _http_con_add);
00104 }
00105 
00106 gearman_return_t gearmand_protocol_http_deinit(gearmand_st *gearmand __attribute__ ((unused)))
00107 {
00108   return GEARMAN_SUCCESS;
00109 }
00110 
00111 /*
00112  * Static definitions
00113  */
00114 
00115 static gearman_return_t _http_con_add(gearman_connection_st *connection)
00116 {
00117   gearman_protocol_http_st *http;
00118 
00119   http= (gearman_protocol_http_st *)malloc(sizeof(gearman_protocol_http_st));
00120   if (http == NULL)
00121   {
00122     gearman_log_error(connection->universal, "_http_con_add", "malloc");
00123     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00124   }
00125 
00126   http->background= false;
00127   http->keep_alive= false;
00128 
00129   gearman_connection_set_protocol_context(connection, http);
00130   gearman_connection_set_protocol_context_free_fn(connection, _http_free);
00131   gearman_connection_set_packet_pack_fn(connection, _http_pack);
00132   gearman_connection_set_packet_unpack_fn(connection, _http_unpack);
00133 
00134   return GEARMAN_SUCCESS;
00135 }
00136 
00137 static void _http_free(gearman_connection_st *connection __attribute__ ((unused)),
00138                        void *context)
00139 {
00140   free((gearman_protocol_http_st *)context);
00141 }
00142 
00143 static size_t _http_pack(const gearman_packet_st *packet, gearman_connection_st *connection,
00144                          void *data, size_t data_size,
00145                          gearman_return_t *ret_ptr)
00146 {
00147   size_t pack_size;
00148   gearman_protocol_http_st *http;
00149 
00150   http= (gearman_protocol_http_st *)gearman_connection_protocol_context(connection);
00151 
00152   if (packet->command != GEARMAN_COMMAND_WORK_COMPLETE &&
00153       packet->command != GEARMAN_COMMAND_WORK_FAIL &&
00154       (http->background == false ||
00155        packet->command != GEARMAN_COMMAND_JOB_CREATED))
00156   {
00157     *ret_ptr= GEARMAN_IGNORE_PACKET;
00158     return 0;
00159   }
00160 
00161   pack_size= (size_t)snprintf((char *)data, data_size,
00162                               "HTTP/1.0 200 OK\r\n"
00163                               "X-Gearman-Job-Handle: %.*s\r\n"
00164                               "Content-Length: %"PRIu64"\r\n"
00165                               "Server: Gearman/" PACKAGE_VERSION "\r\n"
00166                               "\r\n",
00167                               packet->command == GEARMAN_COMMAND_JOB_CREATED ?
00168                               (uint32_t)packet->arg_size[0] :
00169                               (uint32_t)packet->arg_size[0] - 1,
00170                               packet->arg[0],
00171                               (uint64_t)packet->data_size);
00172 
00173   if (pack_size > data_size)
00174   {
00175     *ret_ptr= GEARMAN_FLUSH_DATA;
00176     return 0;
00177   }
00178 
00179   if (! (http->keep_alive))
00180   {
00181     gearman_connection_set_option(connection, GEARMAN_CON_CLOSE_AFTER_FLUSH, true);
00182   }
00183 
00184   *ret_ptr= GEARMAN_SUCCESS;
00185   return pack_size;
00186 }
00187 
00188 static size_t _http_unpack(gearman_packet_st *packet, gearman_connection_st *connection,
00189                            const void *data, size_t data_size,
00190                            gearman_return_t *ret_ptr)
00191 {
00192   gearman_protocol_http_st *http;
00193   size_t offset= 0;
00194   const char *request;
00195   size_t request_size;
00196   const char *method;
00197   ptrdiff_t method_size;
00198   const char *uri;
00199   ptrdiff_t uri_size;
00200   const char *version;
00201   size_t version_size;
00202   const char *header;
00203   size_t header_size;
00204   char content_length[11]; /* 11 bytes to fit max display length of uint32_t */
00205   const char *unique= "-";
00206   size_t unique_size= 2;
00207   gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_NORMAL;
00208 
00209   /* Get the request line first. */
00210   request= _http_line(data, data_size, &request_size, &offset);
00211   if (request == NULL || request_size == 0)
00212   {
00213     *ret_ptr= GEARMAN_IO_WAIT;
00214     return offset;
00215   }
00216 
00217   http= (gearman_protocol_http_st *)gearman_connection_protocol_context(connection);
00218   http->background= false;
00219   http->keep_alive= false;
00220 
00221   /* Parse out the method, URI, and HTTP version from the request line. */
00222   method= request;
00223   uri= memchr(request, ' ', request_size);
00224   if (uri == NULL)
00225   {
00226     gearman_log_error(packet->universal, "_http_unpack", "bad request line: %.*s", (uint32_t)request_size, request);
00227     *ret_ptr= GEARMAN_INVALID_PACKET;
00228     return 0;
00229   }
00230 
00231   method_size= uri - request;
00232   if ((method_size != 3 ||
00233        (strncasecmp(method, "GET", 3) && strncasecmp(method, "PUT", 3))) &&
00234       (method_size != 4 || strncasecmp(method, "POST", 4)))
00235   {
00236     gearman_log_error(packet->universal, "_http_unpack", "bad method: %.*s", (uint32_t)method_size, method);
00237     *ret_ptr= GEARMAN_INVALID_PACKET;
00238     return 0;
00239   }
00240 
00241   while (*uri == ' ')
00242     uri++;
00243 
00244   while (*uri == '/')
00245     uri++;
00246 
00247   version= memchr(uri, ' ', request_size - (size_t)(uri - request));
00248   if (version == NULL)
00249   {
00250     gearman_log_error(packet->universal, "_http_unpack", "bad request line: %.*s",
00251                       (uint32_t)request_size, request);
00252     *ret_ptr= GEARMAN_INVALID_PACKET;
00253     return 0;
00254   }
00255 
00256   uri_size= version - uri;
00257   if (uri_size == 0)
00258   {
00259     gearman_log_error(packet->universal, "_http_unpack",
00260                       "must give function name in URI");
00261     *ret_ptr= GEARMAN_INVALID_PACKET;
00262     return 0;
00263   }
00264 
00265   while (*version == ' ')
00266     version++;
00267 
00268   version_size= request_size - (size_t)(version - request);
00269 
00270   if (version_size == 8 && !strncasecmp(version, "HTTP/1.1", 8))
00271     http->keep_alive= true;
00272   else if (version_size != 8 || strncasecmp(version, "HTTP/1.0", 8))
00273   {
00274     gearman_log_error(packet->universal, "_http_unpack", "bad version: %.*s",
00275                       (uint32_t)version_size, version);
00276     *ret_ptr= GEARMAN_INVALID_PACKET;
00277     return 0;
00278   }
00279 
00280   /* Loop through all the headers looking for ones of interest. */
00281   while ((header= _http_line(data, data_size, &header_size, &offset)) != NULL)
00282   {
00283     if (header_size == 0)
00284       break;
00285 
00286     if (header_size > 16 && !strncasecmp(header, "Content-Length: ", 16))
00287     {
00288       if ((method_size == 4 && !strncasecmp(method, "POST", 4)) ||
00289           (method_size == 3 && !strncasecmp(method, "PUT", 3)))
00290       {
00291         snprintf(content_length, 11, "%.*s", (uint32_t)header_size - 16,
00292                  header + 16);
00293         packet->data_size= (size_t)atoi(content_length);
00294       }
00295     }
00296     else if (header_size == 22 &&
00297              !strncasecmp(header, "Connection: Keep-Alive", 22))
00298     {
00299       http->keep_alive= true;
00300     }
00301     else if (header_size > 18 && !strncasecmp(header, "X-Gearman-Unique: ", 18))
00302     {
00303       unique= header + 18;
00304       unique_size= header_size - 18;
00305     }
00306     else if (header_size == 26 &&
00307              !strncasecmp(header, "X-Gearman-Background: true", 26))
00308     {
00309       http->background= true;
00310     }
00311     else if (header_size == 24 &&
00312              !strncasecmp(header, "X-Gearman-Priority: high", 24))
00313     {
00314       priority= GEARMAN_JOB_PRIORITY_HIGH;
00315     }
00316     else if (header_size == 23 &&
00317              !strncasecmp(header, "X-Gearman-Priority: low", 23))
00318     {
00319       priority= GEARMAN_JOB_PRIORITY_LOW;
00320     }
00321   }
00322 
00323   /* Make sure we received the end of headers. */
00324   if (header == NULL)
00325   {
00326     *ret_ptr= GEARMAN_IO_WAIT;
00327     return 0;
00328   }
00329 
00330   /* Request and all headers complete, build a packet based on HTTP request. */
00331   packet->magic= GEARMAN_MAGIC_REQUEST;
00332 
00333   if (http->background)
00334   {
00335     if (priority == GEARMAN_JOB_PRIORITY_NORMAL)
00336       packet->command= GEARMAN_COMMAND_SUBMIT_JOB_BG;
00337     else if (priority == GEARMAN_JOB_PRIORITY_HIGH)
00338       packet->command= GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG;
00339     else
00340       packet->command= GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG;
00341   }
00342   else
00343   {
00344     if (priority == GEARMAN_JOB_PRIORITY_NORMAL)
00345       packet->command= GEARMAN_COMMAND_SUBMIT_JOB;
00346     else if (priority == GEARMAN_JOB_PRIORITY_HIGH)
00347       packet->command= GEARMAN_COMMAND_SUBMIT_JOB_HIGH;
00348     else
00349       packet->command= GEARMAN_COMMAND_SUBMIT_JOB_LOW;
00350   }
00351 
00352   *ret_ptr= gearman_packet_pack_header(packet);
00353   if (*ret_ptr != GEARMAN_SUCCESS)
00354     return 0;
00355 
00356   *ret_ptr= gearman_packet_create_arg(packet, uri, (size_t)uri_size + 1);
00357   if (*ret_ptr != GEARMAN_SUCCESS)
00358     return 0;
00359 
00360   *ret_ptr= gearman_packet_create_arg(packet, unique, unique_size + 1);
00361   if (*ret_ptr != GEARMAN_SUCCESS)
00362     return 0;
00363 
00364   /* Make sure function and unique are NULL terminated. */
00365   packet->arg[0][uri_size]= 0;
00366   packet->arg[1][unique_size]= 0;
00367 
00368   *ret_ptr= GEARMAN_SUCCESS;
00369   return offset;
00370 }
00371 
00372 static const char *_http_line(const void *data, size_t data_size,
00373                               size_t *line_size, size_t *offset)
00374 {
00375   const char *start= (const char *)data + *offset;
00376   const char *end;
00377 
00378   end= memchr(start, '\n', data_size - *offset);
00379   if (end == NULL)
00380     return NULL;
00381 
00382   *offset+= (size_t)(end - start) + 1;
00383 
00384   if (end != start && *(end - 1) == '\r')
00385     end--;
00386 
00387   *line_size= (size_t)(end - start);
00388 
00389   return start;
00390 }