00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00030 gearman_command_info_st gearman_command_info_list[GEARMAN_COMMAND_MAX]=
00031 {
00032 { "TEXT", 3, false },
00033 { "CAN_DO", 1, false },
00034 { "CANT_DO", 1, false },
00035 { "RESET_ABILITIES", 0, false },
00036 { "PRE_SLEEP", 0, false },
00037 { "UNUSED", 0, false },
00038 { "NOOP", 0, false },
00039 { "SUBMIT_JOB", 2, true },
00040 { "JOB_CREATED", 1, false },
00041 { "GRAB_JOB", 0, false },
00042 { "NO_JOB", 0, false },
00043 { "JOB_ASSIGN", 2, true },
00044 { "WORK_STATUS", 3, false },
00045 { "WORK_COMPLETE", 1, true },
00046 { "WORK_FAIL", 1, false },
00047 { "GET_STATUS", 1, false },
00048 { "ECHO_REQ", 0, true },
00049 { "ECHO_RES", 0, true },
00050 { "SUBMIT_JOB_BG", 2, true },
00051 { "ERROR", 2, false },
00052 { "STATUS_RES", 5, false },
00053 { "SUBMIT_JOB_HIGH", 2, true },
00054 { "SET_CLIENT_ID", 1, false },
00055 { "CAN_DO_TIMEOUT", 2, false },
00056 { "ALL_YOURS", 0, false },
00057 { "WORK_EXCEPTION", 1, true },
00058 { "OPTION_REQ", 1, false },
00059 { "OPTION_RES", 1, false },
00060 { "WORK_DATA", 1, true },
00061 { "WORK_WARNING", 1, true },
00062 { "GRAB_JOB_UNIQ", 0, false },
00063 { "JOB_ASSIGN_UNIQ", 3, true },
00064 { "SUBMIT_JOB_HIGH_BG", 2, true },
00065 { "SUBMIT_JOB_LOW", 2, true },
00066 { "SUBMIT_JOB_LOW_BG", 2, true },
00067 { "SUBMIT_JOB_SCHED", 7, true },
00068 { "SUBMIT_JOB_EPOCH", 3, true }
00069 };
00070
00073
00074
00075
00076
00077 gearman_return_t gearman_packet_add_arg(gearman_packet_st *packet,
00078 const void *arg, size_t arg_size)
00079 {
00080 void *new_args;
00081 size_t offset;
00082 uint8_t x;
00083
00084 if (packet->argc == gearman_command_info_list[packet->command].argc &&
00085 (!(gearman_command_info_list[packet->command].data) ||
00086 packet->data != NULL))
00087 {
00088 gearman_error_set(packet->gearman, "gearman_packet_add_arg",
00089 "too many arguments for command");
00090 return GEARMAN_TOO_MANY_ARGS;
00091 }
00092
00093 if (packet->argc == gearman_command_info_list[packet->command].argc)
00094 {
00095 packet->data= arg;
00096 packet->data_size= arg_size;
00097 return GEARMAN_SUCCESS;
00098 }
00099
00100 if (packet->args_size == 0 && packet->magic != GEARMAN_MAGIC_TEXT)
00101 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00102
00103 if ((packet->args_size + arg_size) < GEARMAN_ARGS_BUFFER_SIZE)
00104 packet->args= packet->args_buffer;
00105 else
00106 {
00107 if (packet->args == packet->args_buffer)
00108 packet->args= NULL;
00109
00110 new_args= realloc(packet->args, packet->args_size + arg_size);
00111 if (new_args == NULL)
00112 {
00113 gearman_error_set(packet->gearman, "gearman_packet_add_arg", "realloc");
00114 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00115 }
00116
00117 if (packet->args_size > 0)
00118 memcpy(new_args, packet->args_buffer, packet->args_size);
00119
00120 packet->args= new_args;
00121 }
00122
00123 memcpy(packet->args + packet->args_size, arg, arg_size);
00124 packet->args_size+= arg_size;
00125 packet->arg_size[packet->argc]= arg_size;
00126 packet->argc++;
00127
00128 if (packet->magic == GEARMAN_MAGIC_TEXT)
00129 offset= 0;
00130 else
00131 offset= GEARMAN_PACKET_HEADER_SIZE;
00132
00133 for (x= 0; x < packet->argc; x++)
00134 {
00135 packet->arg[x]= packet->args + offset;
00136 offset+= packet->arg_size[x];
00137 }
00138
00139 return GEARMAN_SUCCESS;
00140 }
00141
00142 gearman_return_t gearman_packet_pack_header(gearman_packet_st *packet)
00143 {
00144 uint32_t tmp;
00145
00146 if (packet->magic == GEARMAN_MAGIC_TEXT)
00147 {
00148 packet->options|= GEARMAN_PACKET_COMPLETE;
00149 return GEARMAN_SUCCESS;
00150 }
00151
00152 if (packet->args_size == 0)
00153 {
00154 packet->args= packet->args_buffer;
00155 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00156 }
00157
00158 switch (packet->magic)
00159 {
00160 case GEARMAN_MAGIC_TEXT:
00161 break;
00162
00163 case GEARMAN_MAGIC_REQUEST:
00164 memcpy(packet->args, "\0REQ", 4);
00165 break;
00166
00167 case GEARMAN_MAGIC_RESPONSE:
00168 memcpy(packet->args, "\0RES", 4);
00169 break;
00170
00171 default:
00172 gearman_error_set(packet->gearman, "gearman_packet_pack_header",
00173 "invalid magic value");
00174 return GEARMAN_INVALID_MAGIC;
00175 }
00176
00177 if (packet->command == GEARMAN_COMMAND_TEXT ||
00178 packet->command >= GEARMAN_COMMAND_MAX)
00179 {
00180 gearman_error_set(packet->gearman, "gearman_packet_pack_header",
00181 "invalid command value");
00182 return GEARMAN_INVALID_COMMAND;
00183 }
00184
00185 tmp= packet->command;
00186 tmp= htonl(tmp);
00187 memcpy(packet->args + 4, &tmp, 4);
00188
00189 tmp= (uint32_t)(packet->args_size + packet->data_size) -
00190 GEARMAN_PACKET_HEADER_SIZE;
00191 tmp= htonl(tmp);
00192 memcpy(packet->args + 8, &tmp, 4);
00193
00194 packet->options|= GEARMAN_PACKET_COMPLETE;
00195
00196 return GEARMAN_SUCCESS;
00197 }
00198
00199 gearman_return_t gearman_packet_unpack_header(gearman_packet_st *packet)
00200 {
00201 uint32_t tmp;
00202
00203 if (!memcmp(packet->args, "\0REQ", 4))
00204 packet->magic= GEARMAN_MAGIC_REQUEST;
00205 else if (!memcmp(packet->args, "\0RES", 4))
00206 packet->magic= GEARMAN_MAGIC_RESPONSE;
00207 else
00208 {
00209 gearman_error_set(packet->gearman, "gearman_packet_unpack_header",
00210 "invalid magic value");
00211 return GEARMAN_INVALID_MAGIC;
00212 }
00213
00214 memcpy(&tmp, packet->args + 4, 4);
00215 packet->command= ntohl(tmp);
00216
00217 if (packet->command == GEARMAN_COMMAND_TEXT ||
00218 packet->command >= GEARMAN_COMMAND_MAX)
00219 {
00220 gearman_error_set(packet->gearman, "gearman_packet_unpack_header",
00221 "invalid command value");
00222 return GEARMAN_INVALID_COMMAND;
00223 }
00224
00225 memcpy(&tmp, packet->args + 8, 4);
00226 packet->data_size= ntohl(tmp);
00227
00228 return GEARMAN_SUCCESS;
00229 }
00230
00231 size_t gearman_packet_pack(const gearman_packet_st *packet,
00232 gearman_con_st *con __attribute__ ((unused)),
00233 void *data, size_t data_size,
00234 gearman_return_t *ret_ptr)
00235 {
00236 if (packet->args_size == 0)
00237 {
00238 *ret_ptr= GEARMAN_SUCCESS;
00239 return 0;
00240 }
00241
00242 if (packet->args_size > data_size)
00243 {
00244 *ret_ptr= GEARMAN_FLUSH_DATA;
00245 return 0;
00246 }
00247
00248 memcpy(data, packet->args, packet->args_size);
00249 *ret_ptr= GEARMAN_SUCCESS;
00250 return packet->args_size;
00251 }
00252
00253 size_t gearman_packet_unpack(gearman_packet_st *packet,
00254 gearman_con_st *con __attribute__ ((unused)),
00255 const void *data, size_t data_size,
00256 gearman_return_t *ret_ptr)
00257 {
00258 uint8_t *ptr;
00259 size_t used_size;
00260 size_t arg_size;
00261
00262 if (packet->args_size == 0)
00263 {
00264 if (data_size > 0 && ((uint8_t *)data)[0] != 0)
00265 {
00266
00267 ptr= memchr(data, '\n', data_size);
00268 if (ptr == NULL)
00269 {
00270 *ret_ptr= GEARMAN_IO_WAIT;
00271 return 0;
00272 }
00273
00274 packet->magic= GEARMAN_MAGIC_TEXT;
00275 packet->command= GEARMAN_COMMAND_TEXT;
00276
00277 used_size= (size_t)(ptr - ((uint8_t *)data)) + 1;
00278 *ptr= 0;
00279 if (used_size > 1 && *(ptr - 1) == '\r')
00280 *(ptr - 1)= 0;
00281
00282 for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
00283 {
00284 ptr= memchr(data, ' ', arg_size);
00285 if (ptr != NULL)
00286 {
00287 *ptr= 0;
00288 ptr++;
00289 while (*ptr == ' ')
00290 ptr++;
00291
00292 arg_size-= (size_t)(ptr - ((uint8_t *)data));
00293 }
00294
00295 *ret_ptr= gearman_packet_add_arg(packet, data, ptr == NULL ? arg_size :
00296 (size_t)(ptr - ((uint8_t *)data)));
00297 if (*ret_ptr != GEARMAN_SUCCESS)
00298 return used_size;
00299 }
00300
00301 return used_size;
00302 }
00303 else if (data_size < GEARMAN_PACKET_HEADER_SIZE)
00304 {
00305 *ret_ptr= GEARMAN_IO_WAIT;
00306 return 0;
00307 }
00308
00309 packet->args= packet->args_buffer;
00310 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00311 memcpy(packet->args, data, GEARMAN_PACKET_HEADER_SIZE);
00312
00313 *ret_ptr= gearman_packet_unpack_header(packet);
00314 if (*ret_ptr != GEARMAN_SUCCESS)
00315 return 0;
00316
00317 used_size= GEARMAN_PACKET_HEADER_SIZE;
00318 }
00319 else
00320 used_size= 0;
00321
00322 while (packet->argc != gearman_command_info_list[packet->command].argc)
00323 {
00324 if (packet->argc != (gearman_command_info_list[packet->command].argc - 1) ||
00325 gearman_command_info_list[packet->command].data)
00326 {
00327 ptr= memchr(((uint8_t *)data) + used_size, 0, data_size - used_size);
00328 if (ptr == NULL)
00329 {
00330 *ret_ptr= GEARMAN_IO_WAIT;
00331 return used_size;
00332 }
00333
00334 arg_size= (size_t)(ptr - (((uint8_t *)data) + used_size)) + 1;
00335 *ret_ptr= gearman_packet_add_arg(packet, ((uint8_t *)data) + used_size,
00336 arg_size);
00337 if (*ret_ptr != GEARMAN_SUCCESS)
00338 return used_size;
00339
00340 packet->data_size-= arg_size;
00341 used_size+= arg_size;
00342 }
00343 else
00344 {
00345 if ((data_size - used_size) < packet->data_size)
00346 {
00347 *ret_ptr= GEARMAN_IO_WAIT;
00348 return used_size;
00349 }
00350
00351 *ret_ptr= gearman_packet_add_arg(packet, ((uint8_t *)data) + used_size,
00352 packet->data_size);
00353 if (*ret_ptr != GEARMAN_SUCCESS)
00354 return used_size;
00355
00356 used_size+= packet->data_size;
00357 packet->data_size= 0;
00358 }
00359 }
00360
00361 *ret_ptr= GEARMAN_SUCCESS;
00362 return used_size;
00363 }
00364
00365 void gearman_packet_give_data(gearman_packet_st *packet, const void *data,
00366 size_t data_size)
00367 {
00368 packet->data= data;
00369 packet->data_size= data_size;
00370 packet->options|= (gearman_packet_options_t)GEARMAN_PACKET_FREE_DATA;
00371 }
00372
00373 void *gearman_packet_take_data(gearman_packet_st *packet, size_t *data_size)
00374 {
00375 void *data= (void *)(packet->data);
00376
00377 *data_size= packet->data_size;
00378
00379 packet->data= NULL;
00380 packet->data_size= 0;
00381 packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
00382
00383 return data;
00384 }