00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017 #include <libgearman-server/protocol_http.h>
00018
00028 #define GEARMAN_PROTOCOL_HTTP_DEFAULT_PORT 8080
00029
00033 typedef struct
00034 {
00035 bool background;
00036 bool keep_alive;
00037 } gearman_protocol_http_st;
00038
00039
00040 static gearman_return_t _http_con_add(gearman_connection_st *connection);
00041 static void _http_free(gearman_connection_st *connection, void *context);
00042 static size_t _http_pack(const gearman_packet_st *packet, gearman_connection_st *connection,
00043 void *data, size_t data_size,
00044 gearman_return_t *ret_ptr);
00045 static size_t _http_unpack(gearman_packet_st *packet, gearman_connection_st *connection,
00046 const void *data, size_t data_size,
00047 gearman_return_t *ret_ptr);
00048
00049
00050 static const char *_http_line(const void *data, size_t data_size,
00051 size_t *line_size, size_t *offset);
00052
00055
00056
00057
00058
00059 gearman_return_t gearmand_protocol_http_conf(gearman_conf_st *conf)
00060 {
00061 gearman_conf_module_st *module;
00062
00063 module= gearman_conf_module_create(conf, NULL, "http");
00064 if (module == NULL)
00065 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00066
00067 gearman_conf_module_add_option(module, "port", 0, "PORT",
00068 "Port to listen on.");
00069
00070 return gearman_conf_return(conf);
00071 }
00072
00073 gearman_return_t gearmand_protocol_http_init(gearmand_st *gearmand,
00074 gearman_conf_st *conf)
00075 {
00076 in_port_t port= GEARMAN_PROTOCOL_HTTP_DEFAULT_PORT;
00077 gearman_conf_module_st *module;
00078 const char *name;
00079 const char *value;
00080
00081 gearmand_log_info(gearmand, "Initializing http module");
00082
00083
00084 module= gearman_conf_module_find(conf, "http");
00085 if (module == NULL)
00086 {
00087 gearmand_log_fatal(gearmand, "gearman_protocol_http_init:gearman_conf_module_find:NULL");
00088 return GEARMAN_QUEUE_ERROR;
00089 }
00090
00091 while (gearman_conf_module_value(module, &name, &value))
00092 {
00093 if (!strcmp(name, "port"))
00094 port= (in_port_t)atoi(value);
00095 else
00096 {
00097 gearmand_protocol_http_deinit(gearmand);
00098 gearmand_log_fatal(gearmand, "gearman_protocol_http_init:Unknown argument: %s", name);
00099 return GEARMAN_QUEUE_ERROR;
00100 }
00101 }
00102
00103 return gearmand_port_add(gearmand, port, _http_con_add);
00104 }
00105
00106 gearman_return_t gearmand_protocol_http_deinit(gearmand_st *gearmand __attribute__ ((unused)))
00107 {
00108 return GEARMAN_SUCCESS;
00109 }
00110
00111
00112
00113
00114
00115 static gearman_return_t _http_con_add(gearman_connection_st *connection)
00116 {
00117 gearman_protocol_http_st *http;
00118
00119 http= (gearman_protocol_http_st *)malloc(sizeof(gearman_protocol_http_st));
00120 if (http == NULL)
00121 {
00122 gearman_log_error(connection->universal, "_http_con_add", "malloc");
00123 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00124 }
00125
00126 http->background= false;
00127 http->keep_alive= false;
00128
00129 gearman_connection_set_protocol_context(connection, http);
00130 gearman_connection_set_protocol_context_free_fn(connection, _http_free);
00131 gearman_connection_set_packet_pack_fn(connection, _http_pack);
00132 gearman_connection_set_packet_unpack_fn(connection, _http_unpack);
00133
00134 return GEARMAN_SUCCESS;
00135 }
00136
00137 static void _http_free(gearman_connection_st *connection __attribute__ ((unused)),
00138 void *context)
00139 {
00140 free((gearman_protocol_http_st *)context);
00141 }
00142
00143 static size_t _http_pack(const gearman_packet_st *packet, gearman_connection_st *connection,
00144 void *data, size_t data_size,
00145 gearman_return_t *ret_ptr)
00146 {
00147 size_t pack_size;
00148 gearman_protocol_http_st *http;
00149
00150 http= (gearman_protocol_http_st *)gearman_connection_protocol_context(connection);
00151
00152 if (packet->command != GEARMAN_COMMAND_WORK_COMPLETE &&
00153 packet->command != GEARMAN_COMMAND_WORK_FAIL &&
00154 (http->background == false ||
00155 packet->command != GEARMAN_COMMAND_JOB_CREATED))
00156 {
00157 *ret_ptr= GEARMAN_IGNORE_PACKET;
00158 return 0;
00159 }
00160
00161 pack_size= (size_t)snprintf((char *)data, data_size,
00162 "HTTP/1.0 200 OK\r\n"
00163 "X-Gearman-Job-Handle: %.*s\r\n"
00164 "Content-Length: %"PRIu64"\r\n"
00165 "Server: Gearman/" PACKAGE_VERSION "\r\n"
00166 "\r\n",
00167 packet->command == GEARMAN_COMMAND_JOB_CREATED ?
00168 (uint32_t)packet->arg_size[0] :
00169 (uint32_t)packet->arg_size[0] - 1,
00170 packet->arg[0],
00171 (uint64_t)packet->data_size);
00172
00173 if (pack_size > data_size)
00174 {
00175 *ret_ptr= GEARMAN_FLUSH_DATA;
00176 return 0;
00177 }
00178
00179 if (! (http->keep_alive))
00180 {
00181 gearman_connection_set_option(connection, GEARMAN_CON_CLOSE_AFTER_FLUSH, true);
00182 }
00183
00184 *ret_ptr= GEARMAN_SUCCESS;
00185 return pack_size;
00186 }
00187
00188 static size_t _http_unpack(gearman_packet_st *packet, gearman_connection_st *connection,
00189 const void *data, size_t data_size,
00190 gearman_return_t *ret_ptr)
00191 {
00192 gearman_protocol_http_st *http;
00193 size_t offset= 0;
00194 const char *request;
00195 size_t request_size;
00196 const char *method;
00197 ptrdiff_t method_size;
00198 const char *uri;
00199 ptrdiff_t uri_size;
00200 const char *version;
00201 size_t version_size;
00202 const char *header;
00203 size_t header_size;
00204 char content_length[11];
00205 const char *unique= "-";
00206 size_t unique_size= 2;
00207 gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_NORMAL;
00208
00209
00210 request= _http_line(data, data_size, &request_size, &offset);
00211 if (request == NULL || request_size == 0)
00212 {
00213 *ret_ptr= GEARMAN_IO_WAIT;
00214 return offset;
00215 }
00216
00217 http= (gearman_protocol_http_st *)gearman_connection_protocol_context(connection);
00218 http->background= false;
00219 http->keep_alive= false;
00220
00221
00222 method= request;
00223 uri= memchr(request, ' ', request_size);
00224 if (uri == NULL)
00225 {
00226 gearman_log_error(packet->universal, "_http_unpack", "bad request line: %.*s", (uint32_t)request_size, request);
00227 *ret_ptr= GEARMAN_INVALID_PACKET;
00228 return 0;
00229 }
00230
00231 method_size= uri - request;
00232 if ((method_size != 3 ||
00233 (strncasecmp(method, "GET", 3) && strncasecmp(method, "PUT", 3))) &&
00234 (method_size != 4 || strncasecmp(method, "POST", 4)))
00235 {
00236 gearman_log_error(packet->universal, "_http_unpack", "bad method: %.*s", (uint32_t)method_size, method);
00237 *ret_ptr= GEARMAN_INVALID_PACKET;
00238 return 0;
00239 }
00240
00241 while (*uri == ' ')
00242 uri++;
00243
00244 while (*uri == '/')
00245 uri++;
00246
00247 version= memchr(uri, ' ', request_size - (size_t)(uri - request));
00248 if (version == NULL)
00249 {
00250 gearman_log_error(packet->universal, "_http_unpack", "bad request line: %.*s",
00251 (uint32_t)request_size, request);
00252 *ret_ptr= GEARMAN_INVALID_PACKET;
00253 return 0;
00254 }
00255
00256 uri_size= version - uri;
00257 if (uri_size == 0)
00258 {
00259 gearman_log_error(packet->universal, "_http_unpack",
00260 "must give function name in URI");
00261 *ret_ptr= GEARMAN_INVALID_PACKET;
00262 return 0;
00263 }
00264
00265 while (*version == ' ')
00266 version++;
00267
00268 version_size= request_size - (size_t)(version - request);
00269
00270 if (version_size == 8 && !strncasecmp(version, "HTTP/1.1", 8))
00271 http->keep_alive= true;
00272 else if (version_size != 8 || strncasecmp(version, "HTTP/1.0", 8))
00273 {
00274 gearman_log_error(packet->universal, "_http_unpack", "bad version: %.*s",
00275 (uint32_t)version_size, version);
00276 *ret_ptr= GEARMAN_INVALID_PACKET;
00277 return 0;
00278 }
00279
00280
00281 while ((header= _http_line(data, data_size, &header_size, &offset)) != NULL)
00282 {
00283 if (header_size == 0)
00284 break;
00285
00286 if (header_size > 16 && !strncasecmp(header, "Content-Length: ", 16))
00287 {
00288 if ((method_size == 4 && !strncasecmp(method, "POST", 4)) ||
00289 (method_size == 3 && !strncasecmp(method, "PUT", 3)))
00290 {
00291 snprintf(content_length, 11, "%.*s", (uint32_t)header_size - 16,
00292 header + 16);
00293 packet->data_size= (size_t)atoi(content_length);
00294 }
00295 }
00296 else if (header_size == 22 &&
00297 !strncasecmp(header, "Connection: Keep-Alive", 22))
00298 {
00299 http->keep_alive= true;
00300 }
00301 else if (header_size > 18 && !strncasecmp(header, "X-Gearman-Unique: ", 18))
00302 {
00303 unique= header + 18;
00304 unique_size= header_size - 18;
00305 }
00306 else if (header_size == 26 &&
00307 !strncasecmp(header, "X-Gearman-Background: true", 26))
00308 {
00309 http->background= true;
00310 }
00311 else if (header_size == 24 &&
00312 !strncasecmp(header, "X-Gearman-Priority: high", 24))
00313 {
00314 priority= GEARMAN_JOB_PRIORITY_HIGH;
00315 }
00316 else if (header_size == 23 &&
00317 !strncasecmp(header, "X-Gearman-Priority: low", 23))
00318 {
00319 priority= GEARMAN_JOB_PRIORITY_LOW;
00320 }
00321 }
00322
00323
00324 if (header == NULL)
00325 {
00326 *ret_ptr= GEARMAN_IO_WAIT;
00327 return 0;
00328 }
00329
00330
00331 packet->magic= GEARMAN_MAGIC_REQUEST;
00332
00333 if (http->background)
00334 {
00335 if (priority == GEARMAN_JOB_PRIORITY_NORMAL)
00336 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_BG;
00337 else if (priority == GEARMAN_JOB_PRIORITY_HIGH)
00338 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG;
00339 else
00340 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG;
00341 }
00342 else
00343 {
00344 if (priority == GEARMAN_JOB_PRIORITY_NORMAL)
00345 packet->command= GEARMAN_COMMAND_SUBMIT_JOB;
00346 else if (priority == GEARMAN_JOB_PRIORITY_HIGH)
00347 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_HIGH;
00348 else
00349 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_LOW;
00350 }
00351
00352 *ret_ptr= gearman_packet_pack_header(packet);
00353 if (*ret_ptr != GEARMAN_SUCCESS)
00354 return 0;
00355
00356 *ret_ptr= gearman_packet_create_arg(packet, uri, (size_t)uri_size + 1);
00357 if (*ret_ptr != GEARMAN_SUCCESS)
00358 return 0;
00359
00360 *ret_ptr= gearman_packet_create_arg(packet, unique, unique_size + 1);
00361 if (*ret_ptr != GEARMAN_SUCCESS)
00362 return 0;
00363
00364
00365 packet->arg[0][uri_size]= 0;
00366 packet->arg[1][unique_size]= 0;
00367
00368 *ret_ptr= GEARMAN_SUCCESS;
00369 return offset;
00370 }
00371
00372 static const char *_http_line(const void *data, size_t data_size,
00373 size_t *line_size, size_t *offset)
00374 {
00375 const char *start= (const char *)data + *offset;
00376 const char *end;
00377
00378 end= memchr(start, '\n', data_size - *offset);
00379 if (end == NULL)
00380 return NULL;
00381
00382 *offset+= (size_t)(end - start) + 1;
00383
00384 if (end != start && *(end - 1) == '\r')
00385 end--;
00386
00387 *line_size= (size_t)(end - start);
00388
00389 return start;
00390 }