Gearman Developer Documentation

libgearman/worker.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00019 struct _worker_function_st
00020 {
00021   struct {
00022     bool packet_in_use:1;
00023     bool change:1;
00024     bool remove:1;
00025   } options;
00026   struct _worker_function_st *next;
00027   struct _worker_function_st *prev;
00028   char *function_name;
00029   size_t function_length;
00030   gearman_worker_fn *worker_fn;
00031   void *context;
00032   gearman_packet_st packet;
00033 };
00034 
00041 static inline struct _worker_function_st *_function_exist(gearman_worker_st *worker, const char *function_name, size_t function_length)
00042 {
00043   struct _worker_function_st *function;
00044 
00045   for (function= worker->function_list; function != NULL;
00046        function= function->next)
00047   {
00048     if (function_length == function->function_length)
00049     {
00050       if (! memcmp(function_name, function->function_name, function_length))
00051         break;
00052     }
00053   }
00054 
00055   return function;
00056 }
00057 
00061 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone);
00062 
00066 static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
00067 
00071 static gearman_return_t _worker_add_server(const char *host, in_port_t port,
00072                                            void *context);
00073 
00077 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
00078                                                 const char *function_name,
00079                                                 size_t function_length,
00080                                                 uint32_t timeout,
00081                                                 gearman_worker_fn *worker_fn,
00082                                                 void *context);
00083 
00087 static void _worker_function_free(gearman_worker_st *worker,
00088                                   struct _worker_function_st *function);
00089 
00090 
00093 /*
00094  * Public Definitions
00095  */
00096 
00097 gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
00098 {
00099   worker= _worker_allocate(worker, false);
00100 
00101   if (worker == NULL)
00102     return NULL;
00103 
00104   if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
00105   {
00106     gearman_worker_free(worker);
00107     return NULL;
00108   }
00109 
00110   return worker;
00111 }
00112 
00113 gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
00114                                         const gearman_worker_st *from)
00115 {
00116   gearman_universal_st *check;
00117 
00118   if (! from)
00119   {
00120     return _worker_allocate(worker, false);
00121   }
00122 
00123   worker= _worker_allocate(worker, true);
00124 
00125   if (worker == NULL)
00126   {
00127     return worker;
00128   }
00129 
00130   worker->options.non_blocking= from->options.non_blocking;
00131   worker->options.grab_job_in_use= from->options.grab_job_in_use;
00132   worker->options.pre_sleep_in_use= from->options.pre_sleep_in_use;
00133   worker->options.work_job_in_use= from->options.work_job_in_use;
00134   worker->options.change= from->options.change;
00135   worker->options.grab_uniq= from->options.grab_uniq;
00136   worker->options.timeout_return= from->options.timeout_return;
00137 
00138   check= gearman_universal_clone(&(worker->universal), &from->universal);
00139   if (check == NULL)
00140   {
00141     gearman_worker_free(worker);
00142     return NULL;
00143   }
00144 
00145   if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
00146   {
00147     gearman_worker_free(worker);
00148     return NULL;
00149   }
00150 
00151   return worker;
00152 }
00153 
00154 void gearman_worker_free(gearman_worker_st *worker)
00155 {
00156   if (worker->options.packet_init)
00157   {
00158     gearman_packet_free(&(worker->grab_job));
00159     gearman_packet_free(&(worker->pre_sleep));
00160   }
00161 
00162   if (worker->job != NULL)
00163     gearman_job_free(worker->job);
00164 
00165   if (worker->options.work_job_in_use)
00166     gearman_job_free(&(worker->work_job));
00167 
00168   if (worker->work_result != NULL)
00169   {
00170     if ((&worker->universal)->workload_free_fn == NULL)
00171       free(worker->work_result);
00172     else
00173     {
00174       (&worker->universal)->workload_free_fn(worker->work_result,
00175                                 (void *)(&worker->universal)->workload_free_context);
00176     }
00177   }
00178 
00179   while (worker->function_list != NULL)
00180     _worker_function_free(worker, worker->function_list);
00181 
00182   gearman_job_free_all(worker);
00183 
00184   if ((&worker->universal) != NULL)
00185     gearman_universal_free((&worker->universal));
00186 
00187   if (worker->options.allocated)
00188     free(worker);
00189 }
00190 
00191 const char *gearman_worker_error(gearman_worker_st *worker)
00192 {
00193   return gearman_universal_error((&worker->universal));
00194 }
00195 
00196 int gearman_worker_errno(gearman_worker_st *worker)
00197 {
00198   return gearman_universal_errno((&worker->universal));
00199 }
00200 
00201 gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker)
00202 {
00203   gearman_worker_options_t options;
00204   memset(&options, 0, sizeof(gearman_worker_options_t));
00205 
00206   if (worker->options.allocated)
00207     options|= GEARMAN_WORKER_ALLOCATED;
00208   if (worker->options.non_blocking)
00209     options|= GEARMAN_WORKER_NON_BLOCKING;
00210   if (worker->options.packet_init)
00211     options|= GEARMAN_WORKER_PACKET_INIT;
00212   if (worker->options.grab_job_in_use)
00213     options|= GEARMAN_WORKER_GRAB_JOB_IN_USE;
00214   if (worker->options.pre_sleep_in_use)
00215     options|= GEARMAN_WORKER_PRE_SLEEP_IN_USE;
00216   if (worker->options.work_job_in_use)
00217     options|= GEARMAN_WORKER_WORK_JOB_IN_USE;
00218   if (worker->options.change)
00219     options|= GEARMAN_WORKER_CHANGE;
00220   if (worker->options.grab_uniq)
00221     options|= GEARMAN_WORKER_GRAB_UNIQ;
00222   if (worker->options.timeout_return)
00223     options|= GEARMAN_WORKER_TIMEOUT_RETURN;
00224 
00225   return options;
00226 }
00227 
00228 void gearman_worker_set_options(gearman_worker_st *worker,
00229                                 gearman_worker_options_t options)
00230 {
00231   gearman_worker_options_t usable_options[]= {
00232     GEARMAN_WORKER_NON_BLOCKING,
00233     GEARMAN_WORKER_GRAB_UNIQ,
00234     GEARMAN_WORKER_TIMEOUT_RETURN,
00235     GEARMAN_WORKER_MAX
00236   };
00237 
00238   gearman_worker_options_t *ptr;
00239 
00240 
00241   for (ptr= usable_options; *ptr != GEARMAN_WORKER_MAX ; ptr++)
00242   {
00243     if (options & *ptr)
00244     {
00245       gearman_worker_add_options(worker, *ptr);
00246     }
00247     else
00248     {
00249       gearman_worker_remove_options(worker, *ptr);
00250     }
00251   }
00252 }
00253 
00254 void gearman_worker_add_options(gearman_worker_st *worker,
00255                                 gearman_worker_options_t options)
00256 {
00257   if (options & GEARMAN_WORKER_NON_BLOCKING)
00258   {
00259     gearman_universal_add_options((&worker->universal), GEARMAN_NON_BLOCKING);
00260     worker->options.non_blocking= true;
00261   }
00262 
00263   if (options & GEARMAN_WORKER_GRAB_UNIQ)
00264   {
00265     worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
00266     (void)gearman_packet_pack_header(&(worker->grab_job));
00267     worker->options.grab_uniq= true;
00268   }
00269 
00270   if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
00271   {
00272     worker->options.timeout_return= true;
00273   }
00274 }
00275 
00276 void gearman_worker_remove_options(gearman_worker_st *worker,
00277                                    gearman_worker_options_t options)
00278 {
00279   if (options & GEARMAN_WORKER_NON_BLOCKING)
00280   {
00281     gearman_universal_remove_options((&worker->universal), GEARMAN_NON_BLOCKING);
00282     worker->options.non_blocking= false;
00283   }
00284 
00285   if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
00286   {
00287     worker->options.timeout_return= false;
00288   }
00289 
00290   if (options & GEARMAN_WORKER_GRAB_UNIQ)
00291   {
00292     worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
00293     (void)gearman_packet_pack_header(&(worker->grab_job));
00294     worker->options.grab_uniq= false;
00295   }
00296 }
00297 
00298 int gearman_worker_timeout(gearman_worker_st *worker)
00299 {
00300   return gearman_universal_timeout((&worker->universal));
00301 }
00302 
00303 void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
00304 {
00305   gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
00306   gearman_universal_set_timeout((&worker->universal), timeout);
00307 }
00308 
00309 void *gearman_worker_context(const gearman_worker_st *worker)
00310 {
00311   return worker->context;
00312 }
00313 
00314 void gearman_worker_set_context(gearman_worker_st *worker, void *context)
00315 {
00316   worker->context= context;
00317 }
00318 
00319 void gearman_worker_set_log_fn(gearman_worker_st *worker,
00320                                gearman_log_fn *function, void *context,
00321                                gearman_verbose_t verbose)
00322 {
00323   gearman_set_log_fn((&worker->universal), function, context, verbose);
00324 }
00325 
00326 void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
00327                                            gearman_malloc_fn *function,
00328                                            void *context)
00329 {
00330   gearman_set_workload_malloc_fn((&worker->universal), function, context);
00331 }
00332 
00333 void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
00334                                          gearman_free_fn *function,
00335                                          void *context)
00336 {
00337   gearman_set_workload_free_fn((&worker->universal), function, context);
00338 }
00339 
00340 gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
00341                                            const char *host, in_port_t port)
00342 {
00343   if (gearman_connection_create_args((&worker->universal), NULL, host, port) == NULL)
00344     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00345 
00346   return GEARMAN_SUCCESS;
00347 }
00348 
00349 gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
00350                                             const char *servers)
00351 {
00352   return gearman_parse_servers(servers, _worker_add_server, worker);
00353 }
00354 
00355 void gearman_worker_remove_servers(gearman_worker_st *worker)
00356 {
00357   gearman_free_all_cons((&worker->universal));
00358 }
00359 
00360 gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
00361 {
00362   return gearman_wait((&worker->universal));
00363 }
00364 
00365 gearman_return_t gearman_worker_register(gearman_worker_st *worker,
00366                                          const char *function_name,
00367                                          uint32_t timeout)
00368 {
00369   return _worker_function_create(worker, function_name, strlen(function_name), timeout, NULL, NULL);
00370 }
00371 
00372 bool gearman_worker_function_exist(gearman_worker_st *worker,
00373                                    const char *function_name,
00374                                    size_t function_length)
00375 {
00376   struct _worker_function_st *function;
00377 
00378   function= _function_exist(worker, function_name, function_length);
00379 
00380   return (function && function->options.remove == false) ? true : false;
00381 }
00382 
00383 static inline gearman_return_t _worker_unregister(gearman_worker_st *worker,
00384                                                   const char *function_name, size_t function_length)
00385 {
00386   struct _worker_function_st *function;
00387   gearman_return_t ret;
00388   const void *args[1];
00389   size_t args_size[1];
00390 
00391   function= _function_exist(worker, function_name, function_length);
00392 
00393   if (function == NULL || function->options.remove)
00394     return GEARMAN_NO_REGISTERED_FUNCTION;
00395 
00396   gearman_packet_free(&(function->packet));
00397 
00398   args[0]= function->function_name;
00399   args_size[0]= function->function_length;
00400   ret= gearman_packet_create_args((&worker->universal), &(function->packet),
00401                                   GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
00402                                   args, args_size, 1);
00403   if (ret != GEARMAN_SUCCESS)
00404   {
00405     function->options.packet_in_use= false;
00406     return ret;
00407   }
00408 
00409   function->options.change= true;
00410   function->options.remove= true;
00411 
00412   worker->options.change= true;
00413 
00414   return GEARMAN_SUCCESS;
00415 }
00416 
00417 gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
00418                                            const char *function_name)
00419 {
00420   return _worker_unregister(worker, function_name, strlen(function_name));
00421 }
00422 
00423 gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
00424 {
00425   gearman_return_t ret;
00426   struct _worker_function_st *function;
00427   uint32_t count= 0;
00428 
00429   if (worker->function_list == NULL)
00430     return GEARMAN_NO_REGISTERED_FUNCTIONS;
00431 
00432 
00433   /* Lets find out if we have any functions left that are valid */
00434   for (function= worker->function_list; function != NULL;
00435        function= function->next)
00436   {
00437     if (function->options.remove == false)
00438       count++;
00439   }
00440 
00441   if (count == 0)
00442     return GEARMAN_NO_REGISTERED_FUNCTIONS;
00443 
00444   gearman_packet_free(&(worker->function_list->packet));
00445 
00446   ret= gearman_packet_create_args((&worker->universal),
00447                                   &(worker->function_list->packet),
00448                                   GEARMAN_MAGIC_REQUEST,
00449                                   GEARMAN_COMMAND_RESET_ABILITIES,
00450                                   NULL, NULL, 0);
00451   if (ret != GEARMAN_SUCCESS)
00452   {
00453     worker->function_list->options.packet_in_use= false;
00454 
00455     return ret;
00456   }
00457 
00458   while (worker->function_list->next != NULL)
00459     _worker_function_free(worker, worker->function_list->next);
00460 
00461   worker->function_list->options.change= true;
00462   worker->function_list->options.remove= true;
00463 
00464   worker->options.change= true;
00465 
00466   return GEARMAN_SUCCESS;
00467 }
00468 
00469 gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
00470                                         gearman_job_st *job,
00471                                         gearman_return_t *ret_ptr)
00472 {
00473   struct _worker_function_st *function;
00474   uint32_t active;
00475 
00476   while (1)
00477   {
00478     switch (worker->state)
00479     {
00480     case GEARMAN_WORKER_STATE_START:
00481       /* If there are any new functions changes, send them now. */
00482       if (worker->options.change)
00483       {
00484         worker->function= worker->function_list;
00485         while (worker->function != NULL)
00486         {
00487           if (! (worker->function->options.change))
00488           {
00489             worker->function= worker->function->next;
00490             continue;
00491           }
00492 
00493           for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00494                worker->con= worker->con->next)
00495           {
00496             if (worker->con->fd == -1)
00497               continue;
00498 
00499     case GEARMAN_WORKER_STATE_FUNCTION_SEND:
00500             *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
00501                                        true);
00502             if (*ret_ptr != GEARMAN_SUCCESS)
00503             {
00504               if (*ret_ptr == GEARMAN_IO_WAIT)
00505                 worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
00506               else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00507                 continue;
00508 
00509               return NULL;
00510             }
00511           }
00512 
00513           if (worker->function->options.remove)
00514           {
00515             function= worker->function->prev;
00516             _worker_function_free(worker, worker->function);
00517             if (function == NULL)
00518               worker->function= worker->function_list;
00519             else
00520               worker->function= function;
00521           }
00522           else
00523           {
00524             worker->function->options.change= false;
00525             worker->function= worker->function->next;
00526           }
00527         }
00528 
00529         worker->options.change= false;
00530       }
00531 
00532       if (worker->function_list == NULL)
00533       {
00534         gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00535                           "no functions have been registered");
00536         *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
00537         return NULL;
00538       }
00539 
00540       for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00541            worker->con= worker->con->next)
00542       {
00543         /* If the connection to the job server is not active, start it. */
00544         if (worker->con->fd == -1)
00545         {
00546           for (worker->function= worker->function_list;
00547                worker->function != NULL;
00548                worker->function= worker->function->next)
00549           {
00550     case GEARMAN_WORKER_STATE_CONNECT:
00551             *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
00552                                        true);
00553             if (*ret_ptr != GEARMAN_SUCCESS)
00554             {
00555               if (*ret_ptr == GEARMAN_IO_WAIT)
00556                 worker->state= GEARMAN_WORKER_STATE_CONNECT;
00557               else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT ||
00558                        *ret_ptr == GEARMAN_LOST_CONNECTION)
00559               {
00560                 break;
00561               }
00562 
00563               return NULL;
00564             }
00565           }
00566 
00567           if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
00568             continue;
00569         }
00570 
00571     case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
00572         if (worker->con->fd == -1)
00573           continue;
00574 
00575         *ret_ptr= gearman_connection_send(worker->con, &(worker->grab_job), true);
00576         if (*ret_ptr != GEARMAN_SUCCESS)
00577         {
00578           if (*ret_ptr == GEARMAN_IO_WAIT)
00579             worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
00580           else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00581             continue;
00582 
00583           return NULL;
00584         }
00585 
00586         if (worker->job == NULL)
00587         {
00588           worker->job= gearman_job_create(worker, job);
00589           if (worker->job == NULL)
00590           {
00591             *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00592             return NULL;
00593           }
00594         }
00595 
00596         while (1)
00597         {
00598     case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
00599           (void)gearman_connection_recv(worker->con, &(worker->job->assigned), ret_ptr,
00600                                  true);
00601           if (*ret_ptr != GEARMAN_SUCCESS)
00602           {
00603             if (*ret_ptr == GEARMAN_IO_WAIT)
00604               worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
00605             else
00606             {
00607               gearman_job_free(worker->job);
00608               worker->job= NULL;
00609 
00610               if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00611                 break;
00612             }
00613 
00614             return NULL;
00615           }
00616 
00617           if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN ||
00618               worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
00619           {
00620             worker->job->options.assigned_in_use= true;
00621             worker->job->con= worker->con;
00622             worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
00623             job= worker->job;
00624             worker->job= NULL;
00625             return job;
00626           }
00627 
00628           if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB)
00629           {
00630             gearman_packet_free(&(worker->job->assigned));
00631             break;
00632           }
00633 
00634           if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
00635           {
00636             gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00637                               "unexpected packet:%s",
00638                               gearman_command_info_list[worker->job->assigned.command].name);
00639             gearman_packet_free(&(worker->job->assigned));
00640             gearman_job_free(worker->job);
00641             worker->job= NULL;
00642             *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
00643             return NULL;
00644           }
00645 
00646           gearman_packet_free(&(worker->job->assigned));
00647         }
00648       }
00649 
00650     case GEARMAN_WORKER_STATE_PRE_SLEEP:
00651       for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00652            worker->con= worker->con->next)
00653       {
00654         if (worker->con->fd == -1)
00655           continue;
00656 
00657         *ret_ptr= gearman_connection_send(worker->con, &(worker->pre_sleep), true);
00658         if (*ret_ptr != GEARMAN_SUCCESS)
00659         {
00660           if (*ret_ptr == GEARMAN_IO_WAIT)
00661             worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
00662           else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00663             continue;
00664 
00665           return NULL;
00666         }
00667       }
00668 
00669       worker->state= GEARMAN_WORKER_STATE_START;
00670 
00671       /* Set a watch on all active connections that we sent a PRE_SLEEP to. */
00672       active= 0;
00673       for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00674            worker->con= worker->con->next)
00675       {
00676         if (worker->con->fd == -1)
00677           continue;
00678 
00679         *ret_ptr= gearman_connection_set_events(worker->con, POLLIN);
00680         if (*ret_ptr != GEARMAN_SUCCESS)
00681           return NULL;
00682 
00683         active++;
00684       }
00685 
00686       if ((&worker->universal)->options.non_blocking)
00687       {
00688         *ret_ptr= GEARMAN_NO_JOBS;
00689         return NULL;
00690       }
00691 
00692       if (active == 0)
00693       {
00694         if ((&worker->universal)->timeout < 0)
00695           usleep(GEARMAN_WORKER_WAIT_TIMEOUT * 1000);
00696         else
00697         {
00698           if ((&worker->universal)->timeout > 0)
00699             usleep((unsigned int)(&worker->universal)->timeout * 1000);
00700 
00701           if (worker->options.timeout_return)
00702           {
00703             gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00704                               "timeout reached");
00705             *ret_ptr= GEARMAN_TIMEOUT;
00706             return NULL;
00707           }
00708         }
00709       }
00710       else
00711       {
00712         *ret_ptr= gearman_wait((&worker->universal));
00713         if (*ret_ptr != GEARMAN_SUCCESS && (*ret_ptr != GEARMAN_TIMEOUT ||
00714             worker->options.timeout_return))
00715         {
00716           return NULL;
00717         }
00718       }
00719 
00720       break;
00721 
00722     default:
00723       gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00724                         "unknown state: %u", worker->state);
00725       *ret_ptr= GEARMAN_UNKNOWN_STATE;
00726       return NULL;
00727     }
00728   }
00729 }
00730 
00731 void gearman_job_free(gearman_job_st *job)
00732 {
00733   if (job->options.assigned_in_use)
00734     gearman_packet_free(&(job->assigned));
00735 
00736   if (job->options.work_in_use)
00737     gearman_packet_free(&(job->work));
00738 
00739   if (job->worker->job_list == job)
00740     job->worker->job_list= job->next;
00741   if (job->prev != NULL)
00742     job->prev->next= job->next;
00743   if (job->next != NULL)
00744     job->next->prev= job->prev;
00745   job->worker->job_count--;
00746 
00747   if (job->options.allocated)
00748     free(job);
00749 }
00750 
00751 void gearman_job_free_all(gearman_worker_st *worker)
00752 {
00753   while (worker->job_list != NULL)
00754     gearman_job_free(worker->job_list);
00755 }
00756 
00757 gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
00758                                              const char *function_name,
00759                                              uint32_t timeout,
00760                                              gearman_worker_fn *worker_fn,
00761                                              void *context)
00762 {
00763   if (function_name == NULL)
00764   {
00765     gearman_universal_set_error((&worker->universal), "gearman_worker_add_function",
00766                       "function name not given");
00767 
00768     return GEARMAN_INVALID_FUNCTION_NAME;
00769   }
00770 
00771   if (worker_fn == NULL)
00772   {
00773     gearman_universal_set_error((&worker->universal), "gearman_worker_add_function",
00774                       "function not given");
00775 
00776     return GEARMAN_INVALID_WORKER_FUNCTION;
00777   }
00778 
00779   return _worker_function_create(worker, function_name, strlen(function_name),
00780                                  timeout, worker_fn,
00781                                  context);
00782 }
00783 
00784 gearman_return_t gearman_worker_work(gearman_worker_st *worker)
00785 {
00786   gearman_job_st *check;
00787   gearman_return_t ret;
00788 
00789   switch (worker->work_state)
00790   {
00791   case GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB:
00792     check= gearman_worker_grab_job(worker, &(worker->work_job), &ret);
00793     (void)check; // @todo test this good values
00794 
00795     if (ret != GEARMAN_SUCCESS)
00796       return ret;
00797 
00798     for (worker->work_function= worker->function_list;
00799          worker->work_function != NULL;
00800          worker->work_function= worker->work_function->next)
00801     {
00802       if (!strcmp(gearman_job_function_name(&(worker->work_job)),
00803                   worker->work_function->function_name))
00804       {
00805         break;
00806       }
00807     }
00808 
00809     if (worker->work_function == NULL)
00810     {
00811       gearman_job_free(&(worker->work_job));
00812       gearman_universal_set_error((&worker->universal), "gearman_worker_work",
00813                         "function not found");
00814       return GEARMAN_INVALID_FUNCTION_NAME;
00815     }
00816 
00817     if (worker->work_function->worker_fn == NULL)
00818     {
00819       gearman_job_free(&(worker->work_job));
00820       gearman_universal_set_error((&worker->universal), "gearman_worker_work",
00821                         "no callback function supplied");
00822       return GEARMAN_INVALID_FUNCTION_NAME;
00823     }
00824 
00825     worker->options.work_job_in_use= true;
00826     worker->work_result_size= 0;
00827 
00828   case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
00829     worker->work_result= worker->work_function->worker_fn(&(worker->work_job),
00830                                          (void *)worker->work_function->context,
00831                                          &(worker->work_result_size), &ret);
00832     if (ret == GEARMAN_WORK_FAIL)
00833     {
00834       ret= gearman_job_send_fail(&(worker->work_job));
00835       if (ret != GEARMAN_SUCCESS)
00836       {
00837         if (ret == GEARMAN_LOST_CONNECTION)
00838           break;
00839 
00840         worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
00841         return ret;
00842       }
00843 
00844       break;
00845     }
00846 
00847     if (ret != GEARMAN_SUCCESS)
00848     {
00849       if (ret == GEARMAN_LOST_CONNECTION)
00850         break;
00851 
00852       worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION;
00853       return ret;
00854     }
00855 
00856   case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
00857     ret= gearman_job_send_complete(&(worker->work_job), worker->work_result,
00858                               worker->work_result_size);
00859     if (ret == GEARMAN_IO_WAIT)
00860     {
00861       worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
00862       return ret;
00863     }
00864 
00865     if (worker->work_result != NULL)
00866     {
00867       if ((&worker->universal)->workload_free_fn == NULL)
00868         free(worker->work_result);
00869       else
00870       {
00871         (&worker->universal)->workload_free_fn(worker->work_result,
00872                                           (&worker->universal)->workload_free_context);
00873       }
00874       worker->work_result= NULL;
00875     }
00876 
00877     if (ret != GEARMAN_SUCCESS)
00878     {
00879       if (ret == GEARMAN_LOST_CONNECTION)
00880         break;
00881 
00882       return ret;
00883     }
00884 
00885     break;
00886 
00887   case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
00888     ret= gearman_job_send_fail(&(worker->work_job));
00889     if (ret != GEARMAN_SUCCESS)
00890     {
00891       if (ret == GEARMAN_LOST_CONNECTION)
00892         break;
00893 
00894       return ret;
00895     }
00896 
00897    break;
00898 
00899   default:
00900     gearman_universal_set_error((&worker->universal), "gearman_worker_work",
00901                       "unknown state: %u", worker->work_state);
00902     return GEARMAN_UNKNOWN_STATE;
00903   }
00904 
00905   gearman_job_free(&(worker->work_job));
00906   worker->options.work_job_in_use= false;
00907   worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
00908 
00909   return GEARMAN_SUCCESS;
00910 }
00911 
00912 gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
00913                                      const void *workload,
00914                                      size_t workload_size)
00915 {
00916   return gearman_echo((&worker->universal), workload, workload_size);
00917 }
00918 
00919 /*
00920  * Static Definitions
00921  */
00922 
00923 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone)
00924 {
00925   if (worker == NULL)
00926   {
00927     worker= malloc(sizeof(gearman_worker_st));
00928     if (worker == NULL)
00929       return NULL;
00930 
00931     worker->options.allocated= true;
00932   }
00933   else
00934   {
00935     worker->options.allocated= false;
00936   }
00937 
00938   worker->options.non_blocking= false;
00939   worker->options.packet_init= false;
00940   worker->options.grab_job_in_use= false;
00941   worker->options.pre_sleep_in_use= false;
00942   worker->options.work_job_in_use= false;
00943   worker->options.change= false;
00944   worker->options.grab_uniq= false;
00945   worker->options.timeout_return= false;
00946 
00947   worker->state= 0;
00948   worker->work_state= 0;
00949   worker->function_count= 0;
00950   worker->job_count= 0;
00951   worker->work_result_size= 0;
00952   worker->con= NULL;
00953   worker->job= NULL;
00954   worker->job_list= NULL;
00955   worker->function= NULL;
00956   worker->function_list= NULL;
00957   worker->work_function= NULL;
00958   worker->work_result= NULL;
00959 
00960   if (! is_clone)
00961   {
00962     gearman_universal_st *check;
00963 
00964     check= gearman_universal_create(&worker->universal, NULL);
00965     if (check == NULL)
00966     {
00967       gearman_worker_free(worker);
00968       return NULL;
00969     }
00970 
00971     gearman_universal_set_timeout((&worker->universal), GEARMAN_WORKER_WAIT_TIMEOUT);
00972   }
00973 
00974   return worker;
00975 }
00976 
00977 static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
00978 {
00979   gearman_return_t ret;
00980 
00981   ret= gearman_packet_create_args((&worker->universal), &(worker->grab_job),
00982                                   GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB,
00983                                   NULL, NULL, 0);
00984   if (ret != GEARMAN_SUCCESS)
00985     return ret;
00986 
00987   ret= gearman_packet_create_args((&worker->universal), &(worker->pre_sleep),
00988                                   GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
00989                                   NULL, NULL, 0);
00990   if (ret != GEARMAN_SUCCESS)
00991   {
00992     gearman_packet_free(&(worker->grab_job));
00993     return ret;
00994   }
00995 
00996   worker->options.packet_init= true;
00997 
00998   return GEARMAN_SUCCESS;
00999 }
01000 
01001 static gearman_return_t _worker_add_server(const char *host, in_port_t port,
01002                                            void *context)
01003 {
01004   return gearman_worker_add_server((gearman_worker_st *)context, host, port);
01005 }
01006 
01007 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
01008                                                 const char *function_name,
01009                                                 size_t function_length,
01010                                                 uint32_t timeout,
01011                                                 gearman_worker_fn *worker_fn,
01012                                                 void *context)
01013 {
01014   struct _worker_function_st *function;
01015   gearman_return_t ret;
01016   char timeout_buffer[11];
01017   const void *args[2];
01018   size_t args_size[2];
01019 
01020   function= malloc(sizeof(struct _worker_function_st));
01021   if (function == NULL)
01022   {
01023     gearman_universal_set_error((&worker->universal), "_worker_function_create", "malloc");
01024     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
01025   }
01026 
01027   function->options.packet_in_use= true;
01028   function->options.change= true;
01029   function->options.remove= false;
01030 
01031   function->function_name= strdup(function_name);
01032   function->function_length= function_length;
01033   if (function->function_name == NULL)
01034   {
01035     free(function);
01036     gearman_universal_set_error((&worker->universal), "gearman_worker_add_function", "strdup");
01037     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
01038   }
01039 
01040   function->worker_fn= worker_fn;
01041   function->context= context;
01042 
01043   if (timeout > 0)
01044   {
01045     snprintf(timeout_buffer, 11, "%u", timeout);
01046     args[0]= function_name;
01047     args_size[0]= strlen(function_name) + 1;
01048     args[1]= timeout_buffer;
01049     args_size[1]= strlen(timeout_buffer);
01050     ret= gearman_packet_create_args((&worker->universal), &(function->packet),
01051                                     GEARMAN_MAGIC_REQUEST,
01052                                     GEARMAN_COMMAND_CAN_DO_TIMEOUT,
01053                                     args, args_size, 2);
01054   }
01055   else
01056   {
01057     args[0]= function->function_name;
01058     args_size[0]= function->function_length= function_length;
01059     ret= gearman_packet_create_args((&worker->universal), &(function->packet),
01060                                     GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
01061                                     args, args_size, 1);
01062   }
01063   if (ret != GEARMAN_SUCCESS)
01064   {
01065     free(function->function_name);
01066     free(function);
01067     return ret;
01068   }
01069 
01070   if (worker->function_list != NULL)
01071     worker->function_list->prev= function;
01072   function->next= worker->function_list;
01073   function->prev= NULL;
01074   worker->function_list= function;
01075   worker->function_count++;
01076 
01077   worker->options.change= true;
01078 
01079   return GEARMAN_SUCCESS;
01080 }
01081 
01082 static void _worker_function_free(gearman_worker_st *worker,
01083                                   struct _worker_function_st *function)
01084 {
01085   if (worker->function_list == function)
01086     worker->function_list= function->next;
01087   if (function->prev != NULL)
01088     function->prev->next= function->next;
01089   if (function->next != NULL)
01090     function->next->prev= function->prev;
01091   worker->function_count--;
01092 
01093   if (function->options.packet_in_use)
01094     gearman_packet_free(&(function->packet));
01095 
01096   free(function->function_name);
01097   free(function);
01098 }