00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00026 gearman_command_info_st gearman_command_info_list[GEARMAN_COMMAND_MAX]=
00027 {
00028 { "TEXT", 3, false },
00029 { "CAN_DO", 1, false },
00030 { "CANT_DO", 1, false },
00031 { "RESET_ABILITIES", 0, false },
00032 { "PRE_SLEEP", 0, false },
00033 { "UNUSED", 0, false },
00034 { "NOOP", 0, false },
00035 { "SUBMIT_JOB", 2, true },
00036 { "JOB_CREATED", 1, false },
00037 { "GRAB_JOB", 0, false },
00038 { "NO_JOB", 0, false },
00039 { "JOB_ASSIGN", 2, true },
00040 { "WORK_STATUS", 3, false },
00041 { "WORK_COMPLETE", 1, true },
00042 { "WORK_FAIL", 1, false },
00043 { "GET_STATUS", 1, false },
00044 { "ECHO_REQ", 0, true },
00045 { "ECHO_RES", 0, true },
00046 { "SUBMIT_JOB_BG", 2, true },
00047 { "ERROR", 2, false },
00048 { "STATUS_RES", 5, false },
00049 { "SUBMIT_JOB_HIGH", 2, true },
00050 { "SET_CLIENT_ID", 1, false },
00051 { "CAN_DO_TIMEOUT", 2, false },
00052 { "ALL_YOURS", 0, false },
00053 { "WORK_EXCEPTION", 1, true },
00054 { "OPTION_REQ", 1, false },
00055 { "OPTION_RES", 1, false },
00056 { "WORK_DATA", 1, true },
00057 { "WORK_WARNING", 1, true },
00058 { "GRAB_JOB_UNIQ", 0, false },
00059 { "JOB_ASSIGN_UNIQ", 3, true },
00060 { "SUBMIT_JOB_HIGH_BG", 2, true },
00061 { "SUBMIT_JOB_LOW", 2, true },
00062 { "SUBMIT_JOB_LOW_BG", 2, true },
00063 { "SUBMIT_JOB_SCHED", 7, true },
00064 { "SUBMIT_JOB_EPOCH", 3, true }
00065 };
00066
00067 inline static gearman_return_t packet_create_arg(gearman_packet_st *packet,
00068 const void *arg, size_t arg_size)
00069 {
00070 void *new_args;
00071 size_t offset;
00072 uint8_t x;
00073
00074 if (packet->argc == gearman_command_info_list[packet->command].argc &&
00075 (! (gearman_command_info_list[packet->command].data) ||
00076 packet->data != NULL))
00077 {
00078 gearman_universal_set_error(packet->universal, "gearman_packet_create_arg",
00079 "too many arguments for command");
00080 return GEARMAN_TOO_MANY_ARGS;
00081 }
00082
00083 if (packet->argc == gearman_command_info_list[packet->command].argc)
00084 {
00085 packet->data= arg;
00086 packet->data_size= arg_size;
00087 return GEARMAN_SUCCESS;
00088 }
00089
00090 if (packet->args_size == 0 && packet->magic != GEARMAN_MAGIC_TEXT)
00091 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00092
00093 if ((packet->args_size + arg_size) < GEARMAN_ARGS_BUFFER_SIZE)
00094 {
00095 packet->args= packet->args_buffer;
00096 }
00097 else
00098 {
00099 if (packet->args == packet->args_buffer)
00100 packet->args= NULL;
00101
00102 new_args= realloc(packet->args, packet->args_size + arg_size);
00103 if (new_args == NULL)
00104 {
00105 gearman_universal_set_error(packet->universal, "gearman_packet_create_arg", "realloc");
00106 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00107 }
00108
00109 if (packet->args_size > 0)
00110 memcpy(new_args, packet->args_buffer, packet->args_size);
00111
00112 packet->args= new_args;
00113 }
00114
00115 memcpy(packet->args + packet->args_size, arg, arg_size);
00116 packet->args_size+= arg_size;
00117 packet->arg_size[packet->argc]= arg_size;
00118 packet->argc++;
00119
00120 if (packet->magic == GEARMAN_MAGIC_TEXT)
00121 {
00122 offset= 0;
00123 }
00124 else
00125 {
00126 offset= GEARMAN_PACKET_HEADER_SIZE;
00127 }
00128
00129 for (x= 0; x < packet->argc; x++)
00130 {
00131 packet->arg[x]= packet->args + offset;
00132 offset+= packet->arg_size[x];
00133 }
00134
00135 return GEARMAN_SUCCESS;
00136 }
00137
00140
00141
00142
00143
00144
00145 gearman_packet_st *gearman_packet_create(gearman_universal_st *gearman,
00146 gearman_packet_st *packet)
00147 {
00148 if (packet == NULL)
00149 {
00150 packet= malloc(sizeof(gearman_packet_st));
00151 if (packet == NULL)
00152 {
00153 gearman_universal_set_error(gearman, "gearman_packet_create", "malloc");
00154 return NULL;
00155 }
00156
00157 packet->options.allocated= true;
00158 }
00159 else
00160 {
00161 packet->options.allocated= false;
00162 packet->options.complete= false;
00163 packet->options.free_data= false;
00164 }
00165
00166 packet->magic= 0;
00167 packet->command= 0;
00168 packet->argc= 0;
00169 packet->args_size= 0;
00170 packet->data_size= 0;
00171 packet->universal= gearman;
00172
00173 if (! (gearman->options.dont_track_packets))
00174 {
00175 if (gearman->packet_list != NULL)
00176 gearman->packet_list->prev= packet;
00177 packet->next= gearman->packet_list;
00178 packet->prev= NULL;
00179 gearman->packet_list= packet;
00180 gearman->packet_count++;
00181 }
00182
00183 packet->args= NULL;
00184 packet->data= NULL;
00185
00186 return packet;
00187 }
00188
00189 gearman_return_t gearman_packet_create_arg(gearman_packet_st *packet,
00190 const void *arg, size_t arg_size)
00191 {
00192 return packet_create_arg(packet, arg, arg_size);
00193 }
00194
00195 gearman_return_t gearman_packet_create_args(gearman_universal_st *gearman,
00196 gearman_packet_st *packet,
00197 enum gearman_magic_t magic,
00198 gearman_command_t command,
00199 const void *args[],
00200 const size_t args_size[],
00201 size_t args_count)
00202 {
00203 gearman_return_t ret;
00204
00205 packet= gearman_packet_create(gearman, packet);
00206 if (packet == NULL)
00207 {
00208 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00209 }
00210
00211 packet->magic= magic;
00212 packet->command= command;
00213
00214 for (size_t x= 0; x < args_count; x++)
00215 {
00216 ret= packet_create_arg(packet, args[x], args_size[x]);
00217 if (ret != GEARMAN_SUCCESS)
00218 {
00219 gearman_packet_free(packet);
00220 return ret;
00221 }
00222 }
00223
00224 ret= gearman_packet_pack_header(packet);
00225
00226 if (ret != GEARMAN_SUCCESS)
00227 gearman_packet_free(packet);
00228
00229 return ret;
00230 }
00231
00232 void gearman_packet_free(gearman_packet_st *packet)
00233 {
00234 if (packet->args != packet->args_buffer && packet->args != NULL)
00235 free(packet->args);
00236
00237 if (packet->options.free_data && packet->data != NULL)
00238 {
00239 if (packet->universal->workload_free_fn == NULL)
00240 {
00241 free((void *)packet->data);
00242 }
00243 else
00244 {
00245 packet->universal->workload_free_fn((void *)(packet->data),
00246 (void *)packet->universal->workload_free_context);
00247 }
00248 }
00249
00250 if (! (packet->universal->options.dont_track_packets))
00251 {
00252 if (packet->universal->packet_list == packet)
00253 packet->universal->packet_list= packet->next;
00254 if (packet->prev != NULL)
00255 packet->prev->next= packet->next;
00256 if (packet->next != NULL)
00257 packet->next->prev= packet->prev;
00258 packet->universal->packet_count--;
00259 }
00260
00261 if (packet->options.allocated)
00262 free(packet);
00263 }
00264
00265 gearman_return_t gearman_packet_pack_header(gearman_packet_st *packet)
00266 {
00267 uint64_t length_64;
00268 uint32_t tmp;
00269
00270 if (packet->magic == GEARMAN_MAGIC_TEXT)
00271 {
00272 packet->options.complete= true;
00273 return GEARMAN_SUCCESS;
00274 }
00275
00276 if (packet->args_size == 0)
00277 {
00278 packet->args= packet->args_buffer;
00279 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00280 }
00281
00282 switch (packet->magic)
00283 {
00284 case GEARMAN_MAGIC_TEXT:
00285 break;
00286
00287 case GEARMAN_MAGIC_REQUEST:
00288 memcpy(packet->args, "\0REQ", 4);
00289 break;
00290
00291 case GEARMAN_MAGIC_RESPONSE:
00292 memcpy(packet->args, "\0RES", 4);
00293 break;
00294
00295 default:
00296 gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00297 "invalid magic value");
00298 return GEARMAN_INVALID_MAGIC;
00299 }
00300
00301 if (packet->command == GEARMAN_COMMAND_TEXT ||
00302 packet->command >= GEARMAN_COMMAND_MAX)
00303 {
00304 gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00305 "invalid command value");
00306 return GEARMAN_INVALID_COMMAND;
00307 }
00308
00309 tmp= packet->command;
00310 tmp= htonl(tmp);
00311 memcpy(packet->args + 4, &tmp, 4);
00312
00313 length_64= packet->args_size + packet->data_size - GEARMAN_PACKET_HEADER_SIZE;
00314
00315
00316 if (length_64 >= UINT32_MAX || length_64 < packet->data_size)
00317 {
00318 gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00319 "data size too too long");
00320 return GEARMAN_ARGUMENT_TOO_LARGE;
00321 }
00322
00323 tmp= (uint32_t)length_64;
00324 tmp= htonl(tmp);
00325 memcpy(packet->args + 8, &tmp, 4);
00326
00327 packet->options.complete= true;
00328
00329 return GEARMAN_SUCCESS;
00330 }
00331
00332 gearman_return_t gearman_packet_unpack_header(gearman_packet_st *packet)
00333 {
00334 uint32_t tmp;
00335
00336 if (!memcmp(packet->args, "\0REQ", 4))
00337 packet->magic= GEARMAN_MAGIC_REQUEST;
00338 else if (!memcmp(packet->args, "\0RES", 4))
00339 packet->magic= GEARMAN_MAGIC_RESPONSE;
00340 else
00341 {
00342 gearman_universal_set_error(packet->universal, "gearman_packet_unpack_header",
00343 "invalid magic value");
00344 return GEARMAN_INVALID_MAGIC;
00345 }
00346
00347 memcpy(&tmp, packet->args + 4, 4);
00348 packet->command= ntohl(tmp);
00349
00350 if (packet->command == GEARMAN_COMMAND_TEXT ||
00351 packet->command >= GEARMAN_COMMAND_MAX)
00352 {
00353 gearman_universal_set_error(packet->universal, "gearman_packet_unpack_header",
00354 "invalid command value");
00355 return GEARMAN_INVALID_COMMAND;
00356 }
00357
00358 memcpy(&tmp, packet->args + 8, 4);
00359 packet->data_size= ntohl(tmp);
00360
00361 return GEARMAN_SUCCESS;
00362 }
00363
00364 size_t gearman_packet_pack(const gearman_packet_st *packet,
00365 gearman_connection_st *con __attribute__ ((unused)),
00366 void *data, size_t data_size,
00367 gearman_return_t *ret_ptr)
00368 {
00369 if (packet->args_size == 0)
00370 {
00371 *ret_ptr= GEARMAN_SUCCESS;
00372 return 0;
00373 }
00374
00375 if (packet->args_size > data_size)
00376 {
00377 *ret_ptr= GEARMAN_FLUSH_DATA;
00378 return 0;
00379 }
00380
00381 memcpy(data, packet->args, packet->args_size);
00382 *ret_ptr= GEARMAN_SUCCESS;
00383 return packet->args_size;
00384 }
00385
00386 size_t gearman_packet_unpack(gearman_packet_st *packet,
00387 gearman_connection_st *con __attribute__ ((unused)),
00388 const void *data, size_t data_size,
00389 gearman_return_t *ret_ptr)
00390 {
00391 uint8_t *ptr;
00392 size_t used_size;
00393 size_t arg_size;
00394
00395 if (packet->args_size == 0)
00396 {
00397 if (data_size > 0 && ((uint8_t *)data)[0] != 0)
00398 {
00399
00400 ptr= memchr(data, '\n', data_size);
00401 if (ptr == NULL)
00402 {
00403 *ret_ptr= GEARMAN_IO_WAIT;
00404 return 0;
00405 }
00406
00407 packet->magic= GEARMAN_MAGIC_TEXT;
00408 packet->command= GEARMAN_COMMAND_TEXT;
00409
00410 used_size= (size_t)(ptr - ((uint8_t *)data)) + 1;
00411 *ptr= 0;
00412 if (used_size > 1 && *(ptr - 1) == '\r')
00413 *(ptr - 1)= 0;
00414
00415 for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
00416 {
00417 ptr= memchr(data, ' ', arg_size);
00418 if (ptr != NULL)
00419 {
00420 *ptr= 0;
00421 ptr++;
00422 while (*ptr == ' ')
00423 ptr++;
00424
00425 arg_size-= (size_t)(ptr - ((uint8_t *)data));
00426 }
00427
00428 *ret_ptr= packet_create_arg(packet, data, ptr == NULL ? arg_size :
00429 (size_t)(ptr - ((uint8_t *)data)));
00430 if (*ret_ptr != GEARMAN_SUCCESS)
00431 return used_size;
00432 }
00433
00434 return used_size;
00435 }
00436 else if (data_size < GEARMAN_PACKET_HEADER_SIZE)
00437 {
00438 *ret_ptr= GEARMAN_IO_WAIT;
00439 return 0;
00440 }
00441
00442 packet->args= packet->args_buffer;
00443 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00444 memcpy(packet->args, data, GEARMAN_PACKET_HEADER_SIZE);
00445
00446 *ret_ptr= gearman_packet_unpack_header(packet);
00447 if (*ret_ptr != GEARMAN_SUCCESS)
00448 return 0;
00449
00450 used_size= GEARMAN_PACKET_HEADER_SIZE;
00451 }
00452 else
00453 {
00454 used_size= 0;
00455 }
00456
00457 while (packet->argc != gearman_command_info_list[packet->command].argc)
00458 {
00459 if (packet->argc != (gearman_command_info_list[packet->command].argc - 1) ||
00460 gearman_command_info_list[packet->command].data)
00461 {
00462 ptr= memchr(((uint8_t *)data) + used_size, 0, data_size - used_size);
00463 if (ptr == NULL)
00464 {
00465 *ret_ptr= GEARMAN_IO_WAIT;
00466 return used_size;
00467 }
00468
00469 arg_size= (size_t)(ptr - (((uint8_t *)data) + used_size)) + 1;
00470 *ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size, arg_size);
00471
00472 if (*ret_ptr != GEARMAN_SUCCESS)
00473 return used_size;
00474
00475 packet->data_size-= arg_size;
00476 used_size+= arg_size;
00477 }
00478 else
00479 {
00480 if ((data_size - used_size) < packet->data_size)
00481 {
00482 *ret_ptr= GEARMAN_IO_WAIT;
00483 return used_size;
00484 }
00485
00486 *ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size,
00487 packet->data_size);
00488 if (*ret_ptr != GEARMAN_SUCCESS)
00489 return used_size;
00490
00491 used_size+= packet->data_size;
00492 packet->data_size= 0;
00493 }
00494 }
00495
00496 *ret_ptr= GEARMAN_SUCCESS;
00497 return used_size;
00498 }
00499
00500 void gearman_packet_give_data(gearman_packet_st *packet, const void *data,
00501 size_t data_size)
00502 {
00503 packet->data= data;
00504 packet->data_size= data_size;
00505 packet->options.free_data= true;
00506 }
00507
00508 void *gearman_packet_take_data(gearman_packet_st *packet, size_t *data_size)
00509 {
00510 void *data= (void *)(packet->data);
00511
00512 *data_size= packet->data_size;
00513
00514 packet->data= NULL;
00515 packet->data_size= 0;
00516 packet->options.free_data= false;
00517
00518 return data;
00519 }