Gearman Developer Documentation

libgearman/packet.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00026 gearman_command_info_st gearman_command_info_list[GEARMAN_COMMAND_MAX]=
00027 {
00028   { "TEXT",               3, false },
00029   { "CAN_DO",             1, false },
00030   { "CANT_DO",            1, false },
00031   { "RESET_ABILITIES",    0, false },
00032   { "PRE_SLEEP",          0, false },
00033   { "UNUSED",             0, false },
00034   { "NOOP",               0, false },
00035   { "SUBMIT_JOB",         2, true  },
00036   { "JOB_CREATED",        1, false },
00037   { "GRAB_JOB",           0, false },
00038   { "NO_JOB",             0, false },
00039   { "JOB_ASSIGN",         2, true  },
00040   { "WORK_STATUS",        3, false },
00041   { "WORK_COMPLETE",      1, true  },
00042   { "WORK_FAIL",          1, false },
00043   { "GET_STATUS",         1, false },
00044   { "ECHO_REQ",           0, true  },
00045   { "ECHO_RES",           0, true  },
00046   { "SUBMIT_JOB_BG",      2, true  },
00047   { "ERROR",              2, false },
00048   { "STATUS_RES",         5, false },
00049   { "SUBMIT_JOB_HIGH",    2, true  },
00050   { "SET_CLIENT_ID",      1, false },
00051   { "CAN_DO_TIMEOUT",     2, false },
00052   { "ALL_YOURS",          0, false },
00053   { "WORK_EXCEPTION",     1, true  },
00054   { "OPTION_REQ",         1, false },
00055   { "OPTION_RES",         1, false },
00056   { "WORK_DATA",          1, true  },
00057   { "WORK_WARNING",       1, true  },
00058   { "GRAB_JOB_UNIQ",      0, false },
00059   { "JOB_ASSIGN_UNIQ",    3, true  },
00060   { "SUBMIT_JOB_HIGH_BG", 2, true  },
00061   { "SUBMIT_JOB_LOW",     2, true  },
00062   { "SUBMIT_JOB_LOW_BG",  2, true  },
00063   { "SUBMIT_JOB_SCHED",   7, true  },
00064   { "SUBMIT_JOB_EPOCH",   3, true  }
00065 };
00066 
00067 inline static gearman_return_t packet_create_arg(gearman_packet_st *packet,
00068                                                  const void *arg, size_t arg_size)
00069 {
00070   void *new_args;
00071   size_t offset;
00072   uint8_t x;
00073 
00074   if (packet->argc == gearman_command_info_list[packet->command].argc &&
00075       (! (gearman_command_info_list[packet->command].data) ||
00076        packet->data != NULL))
00077   {
00078     gearman_universal_set_error(packet->universal, "gearman_packet_create_arg",
00079                       "too many arguments for command");
00080     return GEARMAN_TOO_MANY_ARGS;
00081   }
00082 
00083   if (packet->argc == gearman_command_info_list[packet->command].argc)
00084   {
00085     packet->data= arg;
00086     packet->data_size= arg_size;
00087     return GEARMAN_SUCCESS;
00088   }
00089 
00090   if (packet->args_size == 0 && packet->magic != GEARMAN_MAGIC_TEXT)
00091     packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00092 
00093   if ((packet->args_size + arg_size) < GEARMAN_ARGS_BUFFER_SIZE)
00094   {
00095     packet->args= packet->args_buffer;
00096   }
00097   else
00098   {
00099     if (packet->args == packet->args_buffer)
00100       packet->args= NULL;
00101 
00102     new_args= realloc(packet->args, packet->args_size + arg_size);
00103     if (new_args == NULL)
00104     {
00105       gearman_universal_set_error(packet->universal, "gearman_packet_create_arg", "realloc");
00106       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00107     }
00108 
00109     if (packet->args_size > 0)
00110       memcpy(new_args, packet->args_buffer, packet->args_size);
00111 
00112     packet->args= new_args;
00113   }
00114 
00115   memcpy(packet->args + packet->args_size, arg, arg_size);
00116   packet->args_size+= arg_size;
00117   packet->arg_size[packet->argc]= arg_size;
00118   packet->argc++;
00119 
00120   if (packet->magic == GEARMAN_MAGIC_TEXT)
00121   {
00122     offset= 0;
00123   }
00124   else
00125   {
00126     offset= GEARMAN_PACKET_HEADER_SIZE;
00127   }
00128 
00129   for (x= 0; x < packet->argc; x++)
00130   {
00131     packet->arg[x]= packet->args + offset;
00132     offset+= packet->arg_size[x];
00133   }
00134 
00135   return GEARMAN_SUCCESS;
00136 }
00137 
00140 /*
00141  * Public Definitions
00142  */
00143 
00144 
00145 gearman_packet_st *gearman_packet_create(gearman_universal_st *gearman,
00146                                          gearman_packet_st *packet)
00147 {
00148   if (packet == NULL)
00149   {
00150     packet= malloc(sizeof(gearman_packet_st));
00151     if (packet == NULL)
00152     {
00153       gearman_universal_set_error(gearman, "gearman_packet_create", "malloc");
00154       return NULL;
00155     }
00156 
00157     packet->options.allocated= true;
00158   }
00159   else
00160   {
00161     packet->options.allocated= false;
00162     packet->options.complete= false;
00163     packet->options.free_data= false;
00164   }
00165 
00166   packet->magic= 0;
00167   packet->command= 0;
00168   packet->argc= 0;
00169   packet->args_size= 0;
00170   packet->data_size= 0;
00171   packet->universal= gearman;
00172 
00173   if (! (gearman->options.dont_track_packets))
00174   {
00175     if (gearman->packet_list != NULL)
00176       gearman->packet_list->prev= packet;
00177     packet->next= gearman->packet_list;
00178     packet->prev= NULL;
00179     gearman->packet_list= packet;
00180     gearman->packet_count++;
00181   }
00182 
00183   packet->args= NULL;
00184   packet->data= NULL;
00185 
00186   return packet;
00187 }
00188 
00189 gearman_return_t gearman_packet_create_arg(gearman_packet_st *packet,
00190                                            const void *arg, size_t arg_size)
00191 {
00192   return packet_create_arg(packet, arg, arg_size);
00193 }
00194 
00195 gearman_return_t gearman_packet_create_args(gearman_universal_st *gearman,
00196                                             gearman_packet_st *packet,
00197                                             enum gearman_magic_t magic,
00198                                             gearman_command_t command,
00199                                             const void *args[],
00200                                             const size_t args_size[],
00201                                             size_t args_count)
00202 {
00203   gearman_return_t ret;
00204 
00205   packet= gearman_packet_create(gearman, packet);
00206   if (packet == NULL)
00207   {
00208     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00209   }
00210 
00211   packet->magic= magic;
00212   packet->command= command;
00213 
00214   for (size_t x= 0; x < args_count; x++)
00215   {
00216     ret= packet_create_arg(packet, args[x], args_size[x]);
00217     if (ret != GEARMAN_SUCCESS)
00218     {
00219       gearman_packet_free(packet);
00220       return ret;
00221     }
00222   }
00223 
00224   ret= gearman_packet_pack_header(packet);
00225 
00226   if (ret !=  GEARMAN_SUCCESS)
00227       gearman_packet_free(packet);
00228 
00229   return ret;
00230 }
00231 
00232 void gearman_packet_free(gearman_packet_st *packet)
00233 {
00234   if (packet->args != packet->args_buffer && packet->args != NULL)
00235     free(packet->args);
00236 
00237   if (packet->options.free_data && packet->data != NULL)
00238   {
00239     if (packet->universal->workload_free_fn == NULL)
00240     {
00241       free((void *)packet->data); //@todo fix the need for the casting.
00242     }
00243     else
00244     {
00245       packet->universal->workload_free_fn((void *)(packet->data),
00246                                           (void *)packet->universal->workload_free_context);
00247     }
00248   }
00249 
00250   if (! (packet->universal->options.dont_track_packets))
00251   {
00252     if (packet->universal->packet_list == packet)
00253       packet->universal->packet_list= packet->next;
00254     if (packet->prev != NULL)
00255       packet->prev->next= packet->next;
00256     if (packet->next != NULL)
00257       packet->next->prev= packet->prev;
00258     packet->universal->packet_count--;
00259   }
00260 
00261   if (packet->options.allocated)
00262     free(packet);
00263 }
00264 
00265 gearman_return_t gearman_packet_pack_header(gearman_packet_st *packet)
00266 {
00267   uint64_t length_64;
00268   uint32_t tmp;
00269 
00270   if (packet->magic == GEARMAN_MAGIC_TEXT)
00271   {
00272     packet->options.complete= true;
00273     return GEARMAN_SUCCESS;
00274   }
00275 
00276   if (packet->args_size == 0)
00277   {
00278     packet->args= packet->args_buffer;
00279     packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00280   }
00281 
00282   switch (packet->magic)
00283   {
00284   case GEARMAN_MAGIC_TEXT:
00285     break;
00286 
00287   case GEARMAN_MAGIC_REQUEST:
00288     memcpy(packet->args, "\0REQ", 4);
00289     break;
00290 
00291   case GEARMAN_MAGIC_RESPONSE:
00292     memcpy(packet->args, "\0RES", 4);
00293     break;
00294 
00295   default:
00296     gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00297                       "invalid magic value");
00298     return GEARMAN_INVALID_MAGIC;
00299   }
00300 
00301   if (packet->command == GEARMAN_COMMAND_TEXT ||
00302       packet->command >= GEARMAN_COMMAND_MAX)
00303   {
00304     gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00305                       "invalid command value");
00306     return GEARMAN_INVALID_COMMAND;
00307   }
00308 
00309   tmp= packet->command;
00310   tmp= htonl(tmp);
00311   memcpy(packet->args + 4, &tmp, 4);
00312 
00313   length_64= packet->args_size + packet->data_size - GEARMAN_PACKET_HEADER_SIZE;
00314 
00315   // Check for overflow on 32bit(portable?).
00316   if (length_64 >= UINT32_MAX || length_64 < packet->data_size)
00317   {
00318     gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00319                                 "data size too too long");
00320     return GEARMAN_ARGUMENT_TOO_LARGE;
00321   }
00322 
00323   tmp= (uint32_t)length_64;
00324   tmp= htonl(tmp);
00325   memcpy(packet->args + 8, &tmp, 4);
00326 
00327   packet->options.complete= true;
00328 
00329   return GEARMAN_SUCCESS;
00330 }
00331 
00332 gearman_return_t gearman_packet_unpack_header(gearman_packet_st *packet)
00333 {
00334   uint32_t tmp;
00335 
00336   if (!memcmp(packet->args, "\0REQ", 4))
00337     packet->magic= GEARMAN_MAGIC_REQUEST;
00338   else if (!memcmp(packet->args, "\0RES", 4))
00339     packet->magic= GEARMAN_MAGIC_RESPONSE;
00340   else
00341   {
00342     gearman_universal_set_error(packet->universal, "gearman_packet_unpack_header",
00343                       "invalid magic value");
00344     return GEARMAN_INVALID_MAGIC;
00345   }
00346 
00347   memcpy(&tmp, packet->args + 4, 4);
00348   packet->command= ntohl(tmp);
00349 
00350   if (packet->command == GEARMAN_COMMAND_TEXT ||
00351       packet->command >= GEARMAN_COMMAND_MAX)
00352   {
00353     gearman_universal_set_error(packet->universal, "gearman_packet_unpack_header",
00354                                 "invalid command value");
00355     return GEARMAN_INVALID_COMMAND;
00356   }
00357 
00358   memcpy(&tmp, packet->args + 8, 4);
00359   packet->data_size= ntohl(tmp);
00360 
00361   return GEARMAN_SUCCESS;
00362 }
00363 
00364 size_t gearman_packet_pack(const gearman_packet_st *packet,
00365                            gearman_connection_st *con __attribute__ ((unused)),
00366                            void *data, size_t data_size,
00367                            gearman_return_t *ret_ptr)
00368 {
00369   if (packet->args_size == 0)
00370   {
00371     *ret_ptr= GEARMAN_SUCCESS;
00372     return 0;
00373   }
00374 
00375   if (packet->args_size > data_size)
00376   {
00377     *ret_ptr= GEARMAN_FLUSH_DATA;
00378     return 0;
00379   }
00380 
00381   memcpy(data, packet->args, packet->args_size);
00382   *ret_ptr= GEARMAN_SUCCESS;
00383   return packet->args_size;
00384 }
00385 
00386 size_t gearman_packet_unpack(gearman_packet_st *packet,
00387                              gearman_connection_st *con __attribute__ ((unused)),
00388                              const void *data, size_t data_size,
00389                              gearman_return_t *ret_ptr)
00390 {
00391   uint8_t *ptr;
00392   size_t used_size;
00393   size_t arg_size;
00394 
00395   if (packet->args_size == 0)
00396   {
00397     if (data_size > 0 && ((uint8_t *)data)[0] != 0)
00398     {
00399       /* Try to parse a text-based command. */
00400       ptr= memchr(data, '\n', data_size);
00401       if (ptr == NULL)
00402       {
00403         *ret_ptr= GEARMAN_IO_WAIT;
00404         return 0;
00405       }
00406 
00407       packet->magic= GEARMAN_MAGIC_TEXT;
00408       packet->command= GEARMAN_COMMAND_TEXT;
00409 
00410       used_size= (size_t)(ptr - ((uint8_t *)data)) + 1;
00411       *ptr= 0;
00412       if (used_size > 1 && *(ptr - 1) == '\r')
00413         *(ptr - 1)= 0;
00414 
00415       for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
00416       {
00417         ptr= memchr(data, ' ', arg_size);
00418         if (ptr != NULL)
00419         {
00420           *ptr= 0;
00421           ptr++;
00422           while (*ptr == ' ')
00423             ptr++;
00424 
00425           arg_size-= (size_t)(ptr - ((uint8_t *)data));
00426         }
00427 
00428         *ret_ptr= packet_create_arg(packet, data, ptr == NULL ? arg_size :
00429                                     (size_t)(ptr - ((uint8_t *)data)));
00430         if (*ret_ptr != GEARMAN_SUCCESS)
00431           return used_size;
00432       }
00433 
00434       return used_size;
00435     }
00436     else if (data_size < GEARMAN_PACKET_HEADER_SIZE)
00437     {
00438       *ret_ptr= GEARMAN_IO_WAIT;
00439       return 0;
00440     }
00441 
00442     packet->args= packet->args_buffer;
00443     packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00444     memcpy(packet->args, data, GEARMAN_PACKET_HEADER_SIZE);
00445 
00446     *ret_ptr= gearman_packet_unpack_header(packet);
00447     if (*ret_ptr != GEARMAN_SUCCESS)
00448       return 0;
00449 
00450     used_size= GEARMAN_PACKET_HEADER_SIZE;
00451   }
00452   else
00453   {
00454     used_size= 0;
00455   }
00456 
00457   while (packet->argc != gearman_command_info_list[packet->command].argc)
00458   {
00459     if (packet->argc != (gearman_command_info_list[packet->command].argc - 1) ||
00460         gearman_command_info_list[packet->command].data)
00461     {
00462       ptr= memchr(((uint8_t *)data) + used_size, 0, data_size - used_size);
00463       if (ptr == NULL)
00464       {
00465         *ret_ptr= GEARMAN_IO_WAIT;
00466         return used_size;
00467       }
00468 
00469       arg_size= (size_t)(ptr - (((uint8_t *)data) + used_size)) + 1;
00470       *ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size, arg_size);
00471 
00472       if (*ret_ptr != GEARMAN_SUCCESS)
00473         return used_size;
00474 
00475       packet->data_size-= arg_size;
00476       used_size+= arg_size;
00477     }
00478     else
00479     {
00480       if ((data_size - used_size) < packet->data_size)
00481       {
00482         *ret_ptr= GEARMAN_IO_WAIT;
00483         return used_size;
00484       }
00485 
00486       *ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size,
00487                                   packet->data_size);
00488       if (*ret_ptr != GEARMAN_SUCCESS)
00489         return used_size;
00490 
00491       used_size+= packet->data_size;
00492       packet->data_size= 0;
00493     }
00494   }
00495 
00496   *ret_ptr= GEARMAN_SUCCESS;
00497   return used_size;
00498 }
00499 
00500 void gearman_packet_give_data(gearman_packet_st *packet, const void *data,
00501                               size_t data_size)
00502 {
00503   packet->data= data;
00504   packet->data_size= data_size;
00505   packet->options.free_data= true;
00506 }
00507 
00508 void *gearman_packet_take_data(gearman_packet_st *packet, size_t *data_size)
00509 {
00510   void *data= (void *)(packet->data);
00511 
00512   *data_size= packet->data_size;
00513 
00514   packet->data= NULL;
00515   packet->data_size= 0;
00516   packet->options.free_data= false;
00517 
00518   return data;
00519 }