Gearman Developer Documentation

libgearman/universal.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00022 gearman_universal_st *gearman_universal_create(gearman_universal_st *universal, const gearman_universal_options_t *options)
00023 {
00024   assert(universal);
00025 
00026   { // Set defaults on all options.
00027     universal->options.allocated= false;
00028     universal->options.dont_track_packets= false;
00029     universal->options.non_blocking= false;
00030     universal->options.stored_non_blocking= false;
00031   }
00032 
00033   if (options)
00034   {
00035     while (*options != GEARMAN_MAX)
00036     {
00040       gearman_universal_add_options(universal, *options);
00041       options++;
00042     }
00043   }
00044 
00045   universal->verbose= 0;
00046   universal->con_count= 0;
00047   universal->packet_count= 0;
00048   universal->pfds_size= 0;
00049   universal->sending= 0;
00050   universal->last_errno= 0;
00051   universal->timeout= -1;
00052   universal->con_list= NULL;
00053   universal->packet_list= NULL;
00054   universal->pfds= NULL;
00055   universal->log_fn= NULL;
00056   universal->log_context= NULL;
00057   universal->event_watch_fn= NULL;
00058   universal->event_watch_context= NULL;
00059   universal->workload_malloc_fn= NULL;
00060   universal->workload_malloc_context= NULL;
00061   universal->workload_free_fn= NULL;
00062   universal->workload_free_context= NULL;
00063   universal->last_error[0]= 0;
00064 
00065   return universal;
00066 }
00067 
00068 gearman_universal_st *gearman_universal_clone(gearman_universal_st *destination, const gearman_universal_st *source)
00069 {
00070   gearman_universal_st *check;
00071   gearman_connection_st *con;
00072 
00073   assert(destination);
00074   assert(source);
00075 
00076   if (! destination || ! source)
00077     return NULL;
00078 
00079   check= gearman_universal_create(destination, NULL);
00080 
00081   if (! check)
00082   {
00083     return destination;
00084   }
00085 
00086   (void)gearman_universal_set_option(destination, GEARMAN_NON_BLOCKING, source->options.non_blocking);
00087   (void)gearman_universal_set_option(destination, GEARMAN_DONT_TRACK_PACKETS, source->options.dont_track_packets);
00088 
00089   destination->timeout= source->timeout;
00090 
00091   for (con= source->con_list; con != NULL; con= con->next)
00092   {
00093     if (gearman_connection_clone(destination, NULL, con) == NULL)
00094     {
00095       gearman_universal_free(destination);
00096       return NULL;
00097     }
00098   }
00099 
00100   /* Don't clone job or packet information, this is universal information for
00101     old and active jobs/connections. */
00102 
00103   return destination;
00104 }
00105 
00106 void gearman_universal_free(gearman_universal_st *universal)
00107 {
00108   gearman_free_all_cons(universal);
00109   gearman_free_all_packets(universal);
00110 
00111   if (universal->pfds != NULL)
00112     free(universal->pfds);
00113 
00114   if (universal->options.allocated)
00115   {
00116     assert(0);
00117     free(universal);
00118   }
00119 }
00120 
00121 gearman_return_t gearman_universal_set_option(gearman_universal_st *universal, gearman_universal_options_t option, bool value)
00122 {
00123   switch (option)
00124   {
00125   case GEARMAN_NON_BLOCKING:
00126     universal->options.non_blocking= value;
00127     break;
00128   case GEARMAN_DONT_TRACK_PACKETS:
00129     universal->options.dont_track_packets= value;
00130     break;
00131   case GEARMAN_MAX:
00132   default:
00133     return GEARMAN_INVALID_COMMAND;
00134   }
00135 
00136   return GEARMAN_SUCCESS;
00137 }
00138 
00139 int gearman_universal_timeout(gearman_universal_st *universal)
00140 {
00141   return universal->timeout;
00142 }
00143 
00144 void gearman_universal_set_timeout(gearman_universal_st *universal, int timeout)
00145 {
00146   universal->timeout= timeout;
00147 }
00148 
00149 void gearman_set_log_fn(gearman_universal_st *universal, gearman_log_fn *function,
00150                         void *context, gearman_verbose_t verbose)
00151 {
00152   universal->log_fn= function;
00153   universal->log_context= context;
00154   universal->verbose= verbose;
00155 }
00156 
00157 void gearman_set_event_watch_fn(gearman_universal_st *universal,
00158                                 gearman_event_watch_fn *function,
00159                                 void *context)
00160 {
00161   universal->event_watch_fn= function;
00162   universal->event_watch_context= context;
00163 }
00164 
00165 void gearman_set_workload_malloc_fn(gearman_universal_st *universal,
00166                                     gearman_malloc_fn *function,
00167                                     void *context)
00168 {
00169   universal->workload_malloc_fn= function;
00170   universal->workload_malloc_context= context;
00171 }
00172 
00173 void gearman_set_workload_free_fn(gearman_universal_st *universal,
00174                                   gearman_free_fn *function,
00175                                   void *context)
00176 {
00177   universal->workload_free_fn= function;
00178   universal->workload_free_context= context;
00179 }
00180 
00181 void gearman_free_all_cons(gearman_universal_st *universal)
00182 {
00183   while (universal->con_list != NULL)
00184     gearman_connection_free(universal->con_list);
00185 }
00186 
00187 gearman_return_t gearman_flush_all(gearman_universal_st *universal)
00188 {
00189   gearman_connection_st *con;
00190   gearman_return_t ret;
00191 
00192   for (con= universal->con_list; con != NULL; con= con->next)
00193   {
00194     if (con->events & POLLOUT)
00195       continue;
00196 
00197     ret= gearman_connection_flush(con);
00198     if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00199       return ret;
00200   }
00201 
00202   return GEARMAN_SUCCESS;
00203 }
00204 
00205 gearman_return_t gearman_wait(gearman_universal_st *universal)
00206 {
00207   gearman_connection_st *con;
00208   struct pollfd *pfds;
00209   nfds_t x;
00210   int ret;
00211   gearman_return_t gret;
00212 
00213   if (universal->pfds_size < universal->con_count)
00214   {
00215     pfds= realloc(universal->pfds, universal->con_count * sizeof(struct pollfd));
00216     if (pfds == NULL)
00217     {
00218       gearman_universal_set_error(universal, "gearman_wait", "realloc");
00219       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00220     }
00221 
00222     universal->pfds= pfds;
00223     universal->pfds_size= universal->con_count;
00224   }
00225   else
00226     pfds= universal->pfds;
00227 
00228   x= 0;
00229   for (con= universal->con_list; con != NULL; con= con->next)
00230   {
00231     if (con->events == 0)
00232       continue;
00233 
00234     pfds[x].fd= con->fd;
00235     pfds[x].events= con->events;
00236     pfds[x].revents= 0;
00237     x++;
00238   }
00239 
00240   if (x == 0)
00241   {
00242     gearman_universal_set_error(universal, "gearman_wait", "no active file descriptors");
00243     return GEARMAN_NO_ACTIVE_FDS;
00244   }
00245 
00246   while (1)
00247   {
00248     ret= poll(pfds, x, universal->timeout);
00249     if (ret == -1)
00250     {
00251       if (errno == EINTR)
00252         continue;
00253 
00254       gearman_universal_set_error(universal, "gearman_wait", "poll:%d", errno);
00255       universal->last_errno= errno;
00256       return GEARMAN_ERRNO;
00257     }
00258 
00259     break;
00260   }
00261 
00262   if (ret == 0)
00263   {
00264     gearman_universal_set_error(universal, "gearman_wait", "timeout reached");
00265     return GEARMAN_TIMEOUT;
00266   }
00267 
00268   x= 0;
00269   for (con= universal->con_list; con != NULL; con= con->next)
00270   {
00271     if (con->events == 0)
00272       continue;
00273 
00274     gret= gearman_connection_set_revents(con, pfds[x].revents);
00275     if (gret != GEARMAN_SUCCESS)
00276       return gret;
00277 
00278     x++;
00279   }
00280 
00281   return GEARMAN_SUCCESS;
00282 }
00283 
00284 gearman_connection_st *gearman_ready(gearman_universal_st *universal)
00285 {
00286   gearman_connection_st *con;
00287 
00288   /* We can't keep universal between calls since connections may be removed during
00289     processing. If this list ever gets big, we may want something faster. */
00290 
00291   for (con= universal->con_list; con != NULL; con= con->next)
00292   {
00293     if (con->options.ready)
00294     {
00295       con->options.ready= false;
00296       return con;
00297     }
00298   }
00299 
00300   return NULL;
00301 }
00302 
00307 static inline void _push_blocking(gearman_universal_st *universal, bool *orig_block_universal)
00308 {
00309   *orig_block_universal= universal->options.non_blocking;
00310   universal->options.non_blocking= false;
00311 }
00312 
00313 static inline void _pop_non_blocking(gearman_universal_st *universal, bool orig_block_universal)
00314 {
00315   universal->options.non_blocking= orig_block_universal;
00316 }
00317 
00318 gearman_return_t gearman_echo(gearman_universal_st *universal,
00319                               const void *workload,
00320                               size_t workload_size)
00321 {
00322   gearman_connection_st *con;
00323   gearman_packet_st packet;
00324   gearman_return_t ret;
00325   bool orig_block_universal;
00326 
00327   ret= gearman_packet_create_args(universal, &packet, GEARMAN_MAGIC_REQUEST,
00328                                   GEARMAN_COMMAND_ECHO_REQ,
00329                                   &workload, &workload_size, 1);
00330   if (ret != GEARMAN_SUCCESS)
00331   {
00332     return ret;
00333   }
00334 
00335   _push_blocking(universal, &orig_block_universal);
00336 
00337   for (con= universal->con_list; con != NULL; con= con->next)
00338   {
00339     gearman_packet_st *packet_ptr;
00340 
00341     ret= gearman_connection_send(con, &packet, true);
00342     if (ret != GEARMAN_SUCCESS)
00343     {
00344       goto exit;
00345     }
00346 
00347     packet_ptr= gearman_connection_recv(con, &(con->packet), &ret, true);
00348     if (ret != GEARMAN_SUCCESS)
00349     {
00350       goto exit;
00351     }
00352 
00353     assert(packet_ptr);
00354 
00355     if (con->packet.data_size != workload_size ||
00356         memcmp(workload, con->packet.data, workload_size))
00357     {
00358       gearman_packet_free(&(con->packet));
00359       gearman_universal_set_error(universal, "gearman_echo", "corruption during echo");
00360 
00361       ret= GEARMAN_ECHO_DATA_CORRUPTION;
00362       goto exit;
00363     }
00364 
00365     gearman_packet_free(&(con->packet));
00366   }
00367 
00368   ret= GEARMAN_SUCCESS;
00369 
00370 exit:
00371   gearman_packet_free(&packet);
00372   _pop_non_blocking(universal, orig_block_universal);
00373 
00374   return ret;
00375 }
00376 
00377 void gearman_free_all_packets(gearman_universal_st *universal)
00378 {
00379   while (universal->packet_list != NULL)
00380     gearman_packet_free(universal->packet_list);
00381 }
00382 
00383 /*
00384  * Local Definitions
00385  */
00386 
00387 void gearman_universal_set_error(gearman_universal_st *universal, const char *function,
00388                                  const char *format, ...)
00389 {
00390   size_t size;
00391   char *ptr;
00392   char log_buffer[GEARMAN_MAX_ERROR_SIZE];
00393   va_list args;
00394 
00395   size= strlen(function);
00396   ptr= memcpy(log_buffer, function, size);
00397   ptr+= size;
00398   ptr[0]= ':';
00399   size++;
00400   ptr++;
00401 
00402   va_start(args, format);
00403   size+= (size_t)vsnprintf(ptr, GEARMAN_MAX_ERROR_SIZE - size, format, args);
00404   va_end(args);
00405 
00406   if (universal->log_fn == NULL)
00407   {
00408     if (size >= GEARMAN_MAX_ERROR_SIZE)
00409       size= GEARMAN_MAX_ERROR_SIZE - 1;
00410 
00411     memcpy(universal->last_error, log_buffer, size + 1);
00412   }
00413   else
00414   {
00415     universal->log_fn(log_buffer, GEARMAN_VERBOSE_FATAL,
00416                       (void *)universal->log_context);
00417   }
00418 }