00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017 #include <libgearman-server/protocol_http.h>
00018
00028 #define GEARMAN_PROTOCOL_HTTP_DEFAULT_PORT 8080
00029
00030
00031
00032
00033
00037 typedef struct
00038 {
00039 bool background;
00040 bool keep_alive;
00041 } gearman_protocol_http_st;
00042
00043
00044 static gearman_return_t _http_con_add(gearman_con_st *con);
00045 static void _http_free(gearman_con_st *con , void *data);
00046 static size_t _http_pack(const gearman_packet_st *packet, gearman_con_st *con,
00047 void *data, size_t data_size,
00048 gearman_return_t *ret_ptr);
00049 static size_t _http_unpack(gearman_packet_st *packet, gearman_con_st *con,
00050 const void *data, size_t data_size,
00051 gearman_return_t *ret_ptr);
00052
00053
00054 static const char *_http_line(const void *data, size_t data_size,
00055 size_t *line_size, size_t *offset);
00056
00059
00060
00061
00062
00063 gearman_return_t gearmand_protocol_http_conf(gearman_conf_st *conf)
00064 {
00065 gearman_conf_module_st *module;
00066
00067 module= gearman_conf_module_create(conf, NULL, "http");
00068 if (module == NULL)
00069 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00070
00071 gearman_conf_module_add_option(module, "port", 0, "PORT",
00072 "Port to listen on.");
00073
00074 return gearman_conf_return(conf);
00075 }
00076
00077 gearman_return_t gearmand_protocol_http_init(gearmand_st *gearmand,
00078 gearman_conf_st *conf)
00079 {
00080 in_port_t port= GEARMAN_PROTOCOL_HTTP_DEFAULT_PORT;
00081 gearman_conf_module_st *module;
00082 const char *name;
00083 const char *value;
00084
00085 GEARMAN_INFO(gearmand, "Initializing http module")
00086
00087
00088 module= gearman_conf_module_find(conf, "http");
00089 if (module == NULL)
00090 {
00091 GEARMAN_FATAL(gearmand,
00092 "gearman_protocol_http_init:gearman_conf_module_find:NULL")
00093 return GEARMAN_QUEUE_ERROR;
00094 }
00095
00096 while (gearman_conf_module_value(module, &name, &value))
00097 {
00098 if (!strcmp(name, "port"))
00099 port= (in_port_t)atoi(value);
00100 else
00101 {
00102 gearmand_protocol_http_deinit(gearmand);
00103 GEARMAN_FATAL(gearmand, "gearman_protocol_http_init:Unknown argument: %s",
00104 name)
00105 return GEARMAN_QUEUE_ERROR;
00106 }
00107 }
00108
00109 return gearmand_port_add(gearmand, port, _http_con_add);
00110 }
00111
00112 gearman_return_t gearmand_protocol_http_deinit(gearmand_st *gearmand __attribute__ ((unused)))
00113 {
00114 return GEARMAN_SUCCESS;
00115 }
00116
00117
00118
00119
00120
00121 static gearman_return_t _http_con_add(gearman_con_st *con)
00122 {
00123 gearman_protocol_http_st *http;
00124
00125 http= malloc(sizeof(gearman_protocol_http_st));
00126 if (http == NULL)
00127 {
00128 GEARMAN_ERROR_SET(con->gearman, "_http_con_add", "malloc")
00129 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00130 }
00131
00132 http->background= false;
00133 http->keep_alive= false;
00134
00135 gearman_con_set_protocol_context(con, http);
00136 gearman_con_set_protocol_context_free_fn(con, _http_free);
00137 gearman_con_set_packet_pack_fn(con, _http_pack);
00138 gearman_con_set_packet_unpack_fn(con, _http_unpack);
00139
00140 return GEARMAN_SUCCESS;
00141 }
00142
00143 static void _http_free(gearman_con_st *con __attribute__ ((unused)), void *data)
00144 {
00145 free(data);
00146 }
00147
00148 static size_t _http_pack(const gearman_packet_st *packet, gearman_con_st *con,
00149 void *data, size_t data_size,
00150 gearman_return_t *ret_ptr)
00151 {
00152 size_t pack_size;
00153 gearman_protocol_http_st *http;
00154
00155 http= (gearman_protocol_http_st *)gearman_con_protocol_context(con);
00156
00157 if (packet->command != GEARMAN_COMMAND_WORK_COMPLETE &&
00158 packet->command != GEARMAN_COMMAND_WORK_FAIL &&
00159 (http->background == false ||
00160 packet->command != GEARMAN_COMMAND_JOB_CREATED))
00161 {
00162 *ret_ptr= GEARMAN_IGNORE_PACKET;
00163 return 0;
00164 }
00165
00166 pack_size= (size_t)snprintf((char *)data, data_size,
00167 "HTTP/1.0 200 OK\r\n"
00168 "X-Gearman-Job-Handle: %.*s\r\n"
00169 "Content-Length: %"PRIu64"\r\n"
00170 "Server: Gearman/" PACKAGE_VERSION "\r\n"
00171 "\r\n",
00172 packet->command == GEARMAN_COMMAND_JOB_CREATED ?
00173 (uint32_t)packet->arg_size[0] :
00174 (uint32_t)packet->arg_size[0] - 1,
00175 packet->arg[0],
00176 (uint64_t)packet->data_size);
00177
00178 if (pack_size > data_size)
00179 {
00180 *ret_ptr= GEARMAN_FLUSH_DATA;
00181 return 0;
00182 }
00183
00184 if (!(http->keep_alive))
00185 gearman_con_add_options(con, GEARMAN_CON_CLOSE_AFTER_FLUSH);
00186
00187 *ret_ptr= GEARMAN_SUCCESS;
00188 return pack_size;
00189 }
00190
00191 static size_t _http_unpack(gearman_packet_st *packet, gearman_con_st *con,
00192 const void *data, size_t data_size,
00193 gearman_return_t *ret_ptr)
00194 {
00195 gearman_protocol_http_st *http;
00196 size_t offset= 0;
00197 const char *request;
00198 size_t request_size;
00199 const char *method;
00200 ptrdiff_t method_size;
00201 const char *uri;
00202 ptrdiff_t uri_size;
00203 const char *version;
00204 size_t version_size;
00205 const char *header;
00206 size_t header_size;
00207 char content_length[11];
00208 const char *unique= "-";
00209 size_t unique_size= 2;
00210 gearman_job_priority_t priority= GEARMAN_JOB_PRIORITY_NORMAL;
00211
00212
00213 request= _http_line(data, data_size, &request_size, &offset);
00214 if (request == NULL || request_size == 0)
00215 {
00216 *ret_ptr= GEARMAN_IO_WAIT;
00217 return offset;
00218 }
00219
00220 http= (gearman_protocol_http_st *)gearman_con_protocol_context(con);
00221 http->background= false;
00222 http->keep_alive= false;
00223
00224
00225 method= request;
00226 uri= memchr(request, ' ', request_size);
00227 if (uri == NULL)
00228 {
00229 GEARMAN_ERROR_SET(packet->gearman, "_http_unpack", "bad request line: %.*s",
00230 (uint32_t)request_size, request);
00231 *ret_ptr= GEARMAN_INVALID_PACKET;
00232 return 0;
00233 }
00234
00235 method_size= uri - request;
00236 if ((method_size != 3 ||
00237 (strncasecmp(method, "GET", 3) && strncasecmp(method, "PUT", 3))) &&
00238 (method_size != 4 || strncasecmp(method, "POST", 4)))
00239 {
00240 GEARMAN_ERROR_SET(packet->gearman, "_http_unpack", "bad method: %.*s",
00241 (uint32_t)method_size, method);
00242 *ret_ptr= GEARMAN_INVALID_PACKET;
00243 return 0;
00244 }
00245
00246 while (*uri == ' ')
00247 uri++;
00248
00249 while (*uri == '/')
00250 uri++;
00251
00252 version= memchr(uri, ' ', request_size - (size_t)(uri - request));
00253 if (version == NULL)
00254 {
00255 GEARMAN_ERROR_SET(packet->gearman, "_http_unpack", "bad request line: %.*s",
00256 (uint32_t)request_size, request);
00257 *ret_ptr= GEARMAN_INVALID_PACKET;
00258 return 0;
00259 }
00260
00261 uri_size= version - uri;
00262 if (uri_size == 0)
00263 {
00264 GEARMAN_ERROR_SET(packet->gearman, "_http_unpack",
00265 "must give function name in URI")
00266 *ret_ptr= GEARMAN_INVALID_PACKET;
00267 return 0;
00268 }
00269
00270 while (*version == ' ')
00271 version++;
00272
00273 version_size= request_size - (size_t)(version - request);
00274
00275 if (version_size == 8 && !strncasecmp(version, "HTTP/1.1", 8))
00276 http->keep_alive= true;
00277 else if (version_size != 8 || strncasecmp(version, "HTTP/1.0", 8))
00278 {
00279 GEARMAN_ERROR_SET(packet->gearman, "_http_unpack", "bad version: %.*s",
00280 (uint32_t)version_size, version);
00281 *ret_ptr= GEARMAN_INVALID_PACKET;
00282 return 0;
00283 }
00284
00285
00286 while ((header= _http_line(data, data_size, &header_size, &offset)) != NULL)
00287 {
00288 if (header_size == 0)
00289 break;
00290
00291 if (header_size > 16 && !strncasecmp(header, "Content-Length: ", 16))
00292 {
00293 if ((method_size == 4 && !strncasecmp(method, "POST", 4)) ||
00294 (method_size == 3 && !strncasecmp(method, "PUT", 3)))
00295 {
00296 snprintf(content_length, 11, "%.*s", (uint32_t)header_size - 16,
00297 header + 16);
00298 packet->data_size= (size_t)atoi(content_length);
00299 }
00300 }
00301 else if (header_size == 22 &&
00302 !strncasecmp(header, "Connection: Keep-Alive", 22))
00303 {
00304 http->keep_alive= true;
00305 }
00306 else if (header_size > 18 && !strncasecmp(header, "X-Gearman-Unique: ", 18))
00307 {
00308 unique= header + 18;
00309 unique_size= header_size - 18;
00310 }
00311 else if (header_size == 26 &&
00312 !strncasecmp(header, "X-Gearman-Background: true", 26))
00313 {
00314 http->background= true;
00315 }
00316 else if (header_size == 24 &&
00317 !strncasecmp(header, "X-Gearman-Priority: high", 24))
00318 {
00319 priority= GEARMAN_JOB_PRIORITY_HIGH;
00320 }
00321 else if (header_size == 23 &&
00322 !strncasecmp(header, "X-Gearman-Priority: low", 23))
00323 {
00324 priority= GEARMAN_JOB_PRIORITY_LOW;
00325 }
00326 }
00327
00328
00329 if (header == NULL)
00330 {
00331 *ret_ptr= GEARMAN_IO_WAIT;
00332 return 0;
00333 }
00334
00335
00336 packet->magic= GEARMAN_MAGIC_REQUEST;
00337
00338 if (http->background)
00339 {
00340 if (priority == GEARMAN_JOB_PRIORITY_NORMAL)
00341 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_BG;
00342 else if (priority == GEARMAN_JOB_PRIORITY_HIGH)
00343 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG;
00344 else
00345 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG;
00346 }
00347 else
00348 {
00349 if (priority == GEARMAN_JOB_PRIORITY_NORMAL)
00350 packet->command= GEARMAN_COMMAND_SUBMIT_JOB;
00351 else if (priority == GEARMAN_JOB_PRIORITY_HIGH)
00352 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_HIGH;
00353 else
00354 packet->command= GEARMAN_COMMAND_SUBMIT_JOB_LOW;
00355 }
00356
00357 *ret_ptr= gearman_packet_pack_header(packet);
00358 if (*ret_ptr != GEARMAN_SUCCESS)
00359 return 0;
00360
00361 *ret_ptr= gearman_packet_add_arg(packet, uri, (size_t)uri_size + 1);
00362 if (*ret_ptr != GEARMAN_SUCCESS)
00363 return 0;
00364
00365 *ret_ptr= gearman_packet_add_arg(packet, unique, unique_size + 1);
00366 if (*ret_ptr != GEARMAN_SUCCESS)
00367 return 0;
00368
00369
00370 packet->arg[0][uri_size]= 0;
00371 packet->arg[1][unique_size]= 0;
00372
00373 *ret_ptr= GEARMAN_SUCCESS;
00374 return offset;
00375 }
00376
00377 static const char *_http_line(const void *data, size_t data_size,
00378 size_t *line_size, size_t *offset)
00379 {
00380 const char *start= (const char *)data + *offset;
00381 const char *end;
00382
00383 end= memchr(start, '\n', data_size - *offset);
00384 if (end == NULL)
00385 return NULL;
00386
00387 *offset+= (size_t)(end - start) + 1;
00388
00389 if (end != start && *(end - 1) == '\r')
00390 end--;
00391
00392 *line_size= (size_t)(end - start);
00393
00394 return start;
00395 }