Gearman Developer Documentation

libgearman/client.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00025 static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone);
00026 
00030 static gearman_return_t _client_add_server(const char *host, in_port_t port,
00031                                            void *context);
00032 
00036 static gearman_task_st *_client_add_task(gearman_client_st *client,
00037                                          gearman_task_st *task,
00038                                          void *context,
00039                                          gearman_command_t command,
00040                                          const char *function_name,
00041                                          size_t function_name_length,
00042                                          const char *unique,
00043                                          size_t unique_name_length,
00044                                          const void *workload,
00045                                          size_t workload_size,
00046                                          gearman_return_t *ret_ptr);
00047 
00051 static gearman_return_t _client_run_task(gearman_client_st *client,
00052                                          gearman_task_st *task);
00053 
00057 static void *_client_do(gearman_client_st *client, gearman_command_t command,
00058                         const char *function_name, size_t functiona_name_length,
00059                         const char *unique, size_t unique_length,
00060                         const void *workload, size_t workload_size,
00061                         size_t *result_size, gearman_return_t *ret_ptr);
00062 
00066 static gearman_return_t _client_do_background(gearman_client_st *client,
00067                                               gearman_command_t command,
00068                                               const char *function_name,
00069                                               size_t functiona_name_length,
00070                                               const char *unique,
00071                                               size_t unique_length,
00072                                               const void *workload,
00073                                               size_t workload_size,
00074                                               char *job_handle);
00075 
00079 static gearman_return_t _client_do_data(gearman_task_st *task);
00080 
00084 static gearman_return_t _client_do_status(gearman_task_st *task);
00085 
00089 static gearman_return_t _client_do_fail(gearman_task_st *task);
00090 
00093 /*
00094  * Public Definitions
00095  */
00096 
00097 gearman_client_st *gearman_client_create(gearman_client_st *client)
00098 {
00099   return _client_allocate(client, false);
00100 }
00101 
00102 gearman_client_st *gearman_client_clone(gearman_client_st *client,
00103                                         const gearman_client_st *from)
00104 {
00105   gearman_universal_st *check;
00106 
00107   if (! from)
00108   {
00109     return _client_allocate(client, false);
00110   }
00111 
00112   client= _client_allocate(client, true);
00113 
00114   if (client == NULL)
00115   {
00116     return client;
00117   }
00118 
00119   client->options.non_blocking= from->options.non_blocking;
00120   client->options.task_in_use= from->options.task_in_use;
00121   client->options.unbuffered_result= from->options.unbuffered_result;
00122   client->options.no_new= from->options.no_new;
00123   client->options.free_tasks= from->options.free_tasks;
00124 
00125   check= gearman_universal_clone((&client->universal), &(from->universal));
00126   if (! check)
00127   {
00128     gearman_client_free(client);
00129     return NULL;
00130   }
00131 
00132   return client;
00133 }
00134 
00135 void gearman_client_free(gearman_client_st *client)
00136 {
00137   if (client->options.task_in_use)
00138     gearman_task_free(&(client->do_task));
00139 
00140   gearman_client_task_free_all(client);
00141 
00142   gearman_universal_free(&client->universal);
00143 
00144   if (client->options.allocated)
00145     free(client);
00146 }
00147 
00148 const char *gearman_client_error(const gearman_client_st *client)
00149 {
00150   return gearman_universal_error(&client->universal);
00151 }
00152 
00153 int gearman_client_errno(const gearman_client_st *client)
00154 {
00155   return gearman_universal_errno(&client->universal);
00156 }
00157 
00158 gearman_client_options_t gearman_client_options(const gearman_client_st *client)
00159 {
00160   gearman_client_options_t options;
00161   memset(&options, 0, sizeof(gearman_client_options_t));
00162 
00163   if (client->options.allocated)
00164     options|= GEARMAN_CLIENT_ALLOCATED;
00165   if (client->options.non_blocking)
00166     options|= GEARMAN_CLIENT_NON_BLOCKING;
00167   if (client->options.task_in_use)
00168     options|= GEARMAN_CLIENT_TASK_IN_USE;
00169   if (client->options.unbuffered_result)
00170     options|= GEARMAN_CLIENT_UNBUFFERED_RESULT;
00171   if (client->options.no_new)
00172     options|= GEARMAN_CLIENT_NO_NEW;
00173   if (client->options.free_tasks)
00174     options|= GEARMAN_CLIENT_FREE_TASKS;
00175 
00176   return options;
00177 }
00178 
00179 void gearman_client_set_options(gearman_client_st *client,
00180                                 gearman_client_options_t options)
00181 {
00182   gearman_client_options_t usable_options[]= {
00183     GEARMAN_CLIENT_NON_BLOCKING,
00184     GEARMAN_CLIENT_UNBUFFERED_RESULT,
00185     GEARMAN_CLIENT_FREE_TASKS,
00186     GEARMAN_CLIENT_MAX
00187   };
00188 
00189   gearman_client_options_t *ptr;
00190 
00191 
00192   for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
00193   {
00194     if (options & *ptr)
00195     {
00196       gearman_client_add_options(client, *ptr);
00197     }
00198     else
00199     {
00200       gearman_client_remove_options(client, *ptr);
00201     }
00202   }
00203 }
00204 
00205 void gearman_client_add_options(gearman_client_st *client,
00206                                 gearman_client_options_t options)
00207 {
00208   if (options & GEARMAN_CLIENT_NON_BLOCKING)
00209   {
00210     gearman_universal_add_options(&client->universal, GEARMAN_NON_BLOCKING);
00211     client->options.non_blocking= true;
00212   }
00213 
00214   if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
00215   {
00216     client->options.unbuffered_result= true;
00217   }
00218 
00219   if (options & GEARMAN_CLIENT_FREE_TASKS)
00220   {
00221     client->options.free_tasks= true;
00222   }
00223 }
00224 
00225 void gearman_client_remove_options(gearman_client_st *client,
00226                                    gearman_client_options_t options)
00227 {
00228   if (options & GEARMAN_CLIENT_NON_BLOCKING)
00229   {
00230     gearman_universal_remove_options(&client->universal, GEARMAN_NON_BLOCKING);
00231     client->options.non_blocking= false;
00232   }
00233 
00234   if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
00235   {
00236     client->options.unbuffered_result= false;
00237   }
00238 
00239   if (options & GEARMAN_CLIENT_FREE_TASKS)
00240   {
00241     client->options.free_tasks= false;
00242   }
00243 }
00244 
00245 int gearman_client_timeout(gearman_client_st *client)
00246 {
00247   return gearman_universal_timeout(&client->universal);
00248 }
00249 
00250 void gearman_client_set_timeout(gearman_client_st *client, int timeout)
00251 {
00252   gearman_universal_set_timeout(&client->universal, timeout);
00253 }
00254 
00255 void *gearman_client_context(const gearman_client_st *client)
00256 {
00257   return (void *)(client->context);
00258 }
00259 
00260 void gearman_client_set_context(gearman_client_st *client, void *context)
00261 {
00262   client->context= context;
00263 }
00264 
00265 void gearman_client_set_log_fn(gearman_client_st *client,
00266                                gearman_log_fn *function, void *context,
00267                                gearman_verbose_t verbose)
00268 {
00269   gearman_set_log_fn(&client->universal, function, context, verbose);
00270 }
00271 
00272 void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
00273                                            gearman_malloc_fn *function,
00274                                            void *context)
00275 {
00276   gearman_set_workload_malloc_fn(&client->universal, function, context);
00277 }
00278 
00279 void gearman_client_set_workload_free_fn(gearman_client_st *client,
00280                                          gearman_free_fn *function,
00281                                          void *context)
00282 {
00283   gearman_set_workload_free_fn(&client->universal, function, context);
00284 }
00285 
00286 gearman_return_t gearman_client_add_server(gearman_client_st *client,
00287                                            const char *host, in_port_t port)
00288 {
00289   if (gearman_connection_create_args(&client->universal, NULL, host, port) == NULL)
00290     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00291 
00292   return GEARMAN_SUCCESS;
00293 }
00294 
00295 gearman_return_t gearman_client_add_servers(gearman_client_st *client,
00296                                             const char *servers)
00297 {
00298   return gearman_parse_servers(servers, _client_add_server, client);
00299 }
00300 
00301 void gearman_client_remove_servers(gearman_client_st *client)
00302 {
00303   gearman_free_all_cons(&client->universal);
00304 }
00305 
00306 gearman_return_t gearman_client_wait(gearman_client_st *client)
00307 {
00308   return gearman_wait(&client->universal);
00309 }
00310 
00311 void *gearman_client_do(gearman_client_st *client, const char *function_name,
00312                         const char *unique, const void *workload,
00313                         size_t workload_size, size_t *result_size,
00314                         gearman_return_t *ret_ptr)
00315 {
00316   return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
00317                     function_name, strlen(function_name),
00318                     unique, unique ? strlen(unique) : 0,
00319                     workload, workload_size, result_size, ret_ptr);
00320 }
00321 
00322 void *gearman_client_do_high(gearman_client_st *client,
00323                              const char *function_name, const char *unique,
00324                              const void *workload, size_t workload_size,
00325                              size_t *result_size, gearman_return_t *ret_ptr)
00326 {
00327   return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
00328                     function_name, strlen(function_name),
00329                     unique, unique ? strlen(unique) : 0,
00330                     workload, workload_size, result_size, ret_ptr);
00331 }
00332 
00333 void *gearman_client_do_low(gearman_client_st *client,
00334                             const char *function_name, const char *unique,
00335                             const void *workload, size_t workload_size,
00336                             size_t *result_size, gearman_return_t *ret_ptr)
00337 {
00338   return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
00339                     function_name, strlen(function_name),
00340                     unique, unique ? strlen(unique) : 0,
00341                     workload, workload_size, result_size, ret_ptr);
00342 }
00343 
00344 const char *gearman_client_do_job_handle(const gearman_client_st *client)
00345 {
00346   return client->do_task.job_handle;
00347 }
00348 
00349 void gearman_client_do_status(gearman_client_st *client, uint32_t *numerator,
00350                               uint32_t *denominator)
00351 {
00352   if (numerator != NULL)
00353     *numerator= client->do_task.numerator;
00354 
00355   if (denominator != NULL)
00356     *denominator= client->do_task.denominator;
00357 }
00358 
00359 gearman_return_t gearman_client_do_background(gearman_client_st *client,
00360                                               const char *function_name,
00361                                               const char *unique,
00362                                               const void *workload,
00363                                               size_t workload_size,
00364                                               char *job_handle)
00365 {
00366   return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
00367                                function_name, strlen(function_name),
00368                                unique, unique ? strlen(unique) : 0,
00369                                workload, workload_size,
00370                                job_handle);
00371 }
00372 
00373 gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
00374                                                    const char *function_name,
00375                                                    const char *unique,
00376                                                    const void *workload,
00377                                                    size_t workload_size,
00378                                                    char *job_handle)
00379 {
00380   return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
00381                                function_name, strlen(function_name),
00382                                unique, unique ? strlen(unique) : 0,
00383                                workload, workload_size,
00384                                job_handle);
00385 }
00386 
00387 gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
00388                                                   const char *function_name,
00389                                                   const char *unique,
00390                                                   const void *workload,
00391                                                   size_t workload_size,
00392                                                   char *job_handle)
00393 {
00394   return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
00395                                function_name, strlen(function_name),
00396                                unique, unique ? strlen(unique) : 0,
00397                                workload, workload_size,
00398                                job_handle);
00399 }
00400 
00401 gearman_return_t gearman_client_job_status(gearman_client_st *client,
00402                                            const char *job_handle,
00403                                            bool *is_known, bool *is_running,
00404                                            uint32_t *numerator,
00405                                            uint32_t *denominator)
00406 {
00407   gearman_return_t ret;
00408 
00409   if (! (client->options.task_in_use))
00410   {
00411     (void)gearman_client_add_task_status(client, &(client->do_task), client,
00412                                          job_handle, &ret);
00413     if (ret != GEARMAN_SUCCESS)
00414       return ret;
00415 
00416     client->options.task_in_use= true;
00417   }
00418 
00419   gearman_client_clear_fn(client);
00420 
00421   ret= gearman_client_run_tasks(client);
00422   if (ret != GEARMAN_IO_WAIT)
00423   {
00424     if (is_known != NULL)
00425       *is_known= client->do_task.options.is_known;
00426     if (is_running != NULL)
00427       *is_running= client->do_task.options.is_running;
00428     if (numerator != NULL)
00429       *numerator= client->do_task.numerator;
00430     if (denominator != NULL)
00431       *denominator= client->do_task.denominator;
00432 
00433     gearman_task_free(&(client->do_task));
00434     client->options.task_in_use= false;
00435   }
00436 
00437   return ret;
00438 }
00439 
00440 gearman_return_t gearman_client_echo(gearman_client_st *client,
00441                                      const void *workload,
00442                                      size_t workload_size)
00443 {
00444   return gearman_echo(&client->universal, workload, workload_size);
00445 }
00446 
00447 void gearman_client_task_free_all(gearman_client_st *client)
00448 {
00449   while (client->task_list != NULL)
00450     gearman_task_free(client->task_list);
00451 }
00452 
00453 void gearman_client_set_task_context_free_fn(gearman_client_st *client,
00454                                         gearman_task_context_free_fn *function)
00455 {
00456   client->task_context_free_fn= function;
00457 }
00458 
00459 gearman_task_st *gearman_client_add_task(gearman_client_st *client,
00460                                          gearman_task_st *task,
00461                                          void *context,
00462                                          const char *function_name,
00463                                          const char *unique,
00464                                          const void *workload,
00465                                          size_t workload_size,
00466                                          gearman_return_t *ret_ptr)
00467 {
00468   return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB,
00469                           function_name, strlen(function_name),
00470                           unique, unique ? strlen(unique) : 0,
00471                           workload, workload_size,
00472                           ret_ptr);
00473 }
00474 
00475 gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
00476                                               gearman_task_st *task,
00477                                               void *context,
00478                                               const char *function_name,
00479                                               const char *unique,
00480                                               const void *workload,
00481                                               size_t workload_size,
00482                                               gearman_return_t *ret_ptr)
00483 {
00484   return _client_add_task(client, task, context,
00485                           GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
00486                           function_name, strlen(function_name),
00487                           unique, unique ? strlen(unique) : 0,
00488                           workload, workload_size, ret_ptr);
00489 }
00490 
00491 gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
00492                                              gearman_task_st *task,
00493                                              void *context,
00494                                              const char *function_name,
00495                                              const char *unique,
00496                                              const void *workload,
00497                                              size_t workload_size,
00498                                              gearman_return_t *ret_ptr)
00499 {
00500   return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
00501                           function_name, strlen(function_name),
00502                           unique, unique ? strlen(unique) : 0,
00503                           workload, workload_size,
00504                           ret_ptr);
00505 }
00506 
00507 gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
00508                                                     gearman_task_st *task,
00509                                                     void *context,
00510                                                     const char *function_name,
00511                                                     const char *unique,
00512                                                     const void *workload,
00513                                                     size_t workload_size,
00514                                                     gearman_return_t *ret_ptr)
00515 {
00516   return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
00517                           function_name, strlen(function_name),
00518                           unique, unique ? strlen(unique) : 0,
00519                           workload, workload_size,
00520                           ret_ptr);
00521 }
00522 
00523 gearman_task_st *
00524 gearman_client_add_task_high_background(gearman_client_st *client,
00525                                         gearman_task_st *task,
00526                                         void *context,
00527                                         const char *function_name,
00528                                         const char *unique,
00529                                         const void *workload,
00530                                         size_t workload_size,
00531                                         gearman_return_t *ret_ptr)
00532 {
00533   return _client_add_task(client, task, context,
00534                           GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
00535                           function_name, strlen(function_name),
00536                           unique, unique ? strlen(unique) : 0,
00537                           workload, workload_size, ret_ptr);
00538 }
00539 
00540 gearman_task_st *
00541 gearman_client_add_task_low_background(gearman_client_st *client,
00542                                        gearman_task_st *task,
00543                                        void *context,
00544                                        const char *function_name,
00545                                        const char *unique,
00546                                        const void *workload,
00547                                        size_t workload_size,
00548                                        gearman_return_t *ret_ptr)
00549 {
00550   return _client_add_task(client, task, context,
00551                           GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
00552                           function_name, strlen(function_name),
00553                           unique, unique ? strlen(unique) : 0,
00554                           workload, workload_size, ret_ptr);
00555 }
00556 
00557 gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
00558                                                 gearman_task_st *task,
00559                                                 void *context,
00560                                                 const char *job_handle,
00561                                                 gearman_return_t *ret_ptr)
00562 {
00563   const void *args[1];
00564   size_t args_size[1];
00565 
00566   task= gearman_task_create(client, task);
00567   if (task == NULL)
00568   {
00569     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00570     return NULL;
00571   }
00572 
00573   task->context= context;
00574   snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
00575 
00576   args[0]= job_handle;
00577   args_size[0]= strlen(job_handle);
00578   *ret_ptr= gearman_packet_create_args(&client->universal, &(task->send),
00579                                        GEARMAN_MAGIC_REQUEST,
00580                                        GEARMAN_COMMAND_GET_STATUS,
00581                                        args, args_size, 1);
00582   if (*ret_ptr == GEARMAN_SUCCESS)
00583   {
00584     client->new_tasks++;
00585     client->running_tasks++;
00586     task->options.send_in_use= true;
00587   }
00588 
00589   return task;
00590 }
00591 
00592 void gearman_client_set_workload_fn(gearman_client_st *client,
00593                                     gearman_workload_fn *function)
00594 {
00595   client->workload_fn= function;
00596 }
00597 
00598 void gearman_client_set_created_fn(gearman_client_st *client,
00599                                    gearman_created_fn *function)
00600 {
00601   client->created_fn= function;
00602 }
00603 
00604 void gearman_client_set_data_fn(gearman_client_st *client,
00605                                 gearman_data_fn *function)
00606 {
00607   client->data_fn= function;
00608 }
00609 
00610 void gearman_client_set_warning_fn(gearman_client_st *client,
00611                                    gearman_warning_fn *function)
00612 {
00613   client->warning_fn= function;
00614 }
00615 
00616 void gearman_client_set_status_fn(gearman_client_st *client,
00617                                   gearman_universal_status_fn *function)
00618 {
00619   client->status_fn= function;
00620 }
00621 
00622 void gearman_client_set_complete_fn(gearman_client_st *client,
00623                                     gearman_complete_fn *function)
00624 {
00625   client->complete_fn= function;
00626 }
00627 
00628 void gearman_client_set_exception_fn(gearman_client_st *client,
00629                                      gearman_exception_fn *function)
00630 {
00631   client->exception_fn= function;
00632 }
00633 
00634 void gearman_client_set_fail_fn(gearman_client_st *client,
00635                                 gearman_fail_fn *function)
00636 {
00637   client->fail_fn= function;
00638 }
00639 
00640 void gearman_client_clear_fn(gearman_client_st *client)
00641 {
00642   client->workload_fn= NULL;
00643   client->created_fn= NULL;
00644   client->data_fn= NULL;
00645   client->warning_fn= NULL;
00646   client->status_fn= NULL;
00647   client->complete_fn= NULL;
00648   client->exception_fn= NULL;
00649   client->fail_fn= NULL;
00650 }
00651 
00652 static inline void _push_non_blocking(gearman_client_st *client)
00653 {
00654   client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
00655   client->universal.options.non_blocking= true;
00656 }
00657 
00658 static inline void _pop_non_blocking(gearman_client_st *client)
00659 {
00660   client->universal.options.non_blocking= client->options.non_blocking;
00661   assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
00662 }
00663 
00664 static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
00665 {
00666   gearman_return_t ret;
00667 
00668   switch(client->state)
00669   {
00670   case GEARMAN_CLIENT_STATE_IDLE:
00671     while (1)
00672     {
00673       /* Start any new tasks. */
00674       if (client->new_tasks > 0 && ! (client->options.no_new))
00675       {
00676         for (client->task= client->task_list; client->task != NULL;
00677              client->task= client->task->next)
00678         {
00679           if (client->task->state != GEARMAN_TASK_STATE_NEW)
00680             continue;
00681 
00682   case GEARMAN_CLIENT_STATE_NEW:
00683           ret= _client_run_task(client, client->task);
00684           if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00685           {
00686             client->state= GEARMAN_CLIENT_STATE_NEW;
00687 
00688             return ret;
00689           }
00690         }
00691 
00692         if (client->new_tasks == 0)
00693         {
00694           ret= gearman_flush_all(&client->universal);
00695           if (ret != GEARMAN_SUCCESS)
00696           {
00697             return ret;
00698           }
00699         }
00700       }
00701 
00702       /* See if there are any connections ready for I/O. */
00703       while ((client->con= gearman_ready(&client->universal)) != NULL)
00704       {
00705         if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
00706         {
00707           /* Socket is ready for writing, continue submitting jobs. */
00708           for (client->task= client->task_list; client->task != NULL;
00709                client->task= client->task->next)
00710           {
00711             if (client->task->con != client->con ||
00712                 (client->task->state != GEARMAN_TASK_STATE_SUBMIT &&
00713                  client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
00714             {
00715               continue;
00716             }
00717 
00718   case GEARMAN_CLIENT_STATE_SUBMIT:
00719             ret= _client_run_task(client, client->task);
00720             if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00721             {
00722               client->state= GEARMAN_CLIENT_STATE_SUBMIT;
00723               return ret;
00724             }
00725           }
00726         }
00727 
00728         if (! (client->con->revents & POLLIN))
00729           continue;
00730 
00731         /* Socket is ready for reading. */
00732         while (1)
00733         {
00734           /* Read packet on connection and find which task it belongs to. */
00735           if (client->options.unbuffered_result)
00736           {
00737             /* If client is handling the data read, make sure it's complete. */
00738             if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
00739             {
00740               for (client->task= client->task_list; client->task != NULL;
00741                    client->task= client->task->next)
00742               {
00743                 if (client->task->con == client->con &&
00744                     (client->task->state == GEARMAN_TASK_STATE_DATA ||
00745                      client->task->state == GEARMAN_TASK_STATE_COMPLETE))
00746                 {
00747                   break;
00748                 }
00749               }
00750 
00751               assert(client->task != NULL);
00752             }
00753             else
00754             {
00755               /* Read the next packet, without buffering the data part. */
00756               client->task= NULL;
00757               (void)gearman_connection_recv(client->con, &(client->con->packet), &ret,
00758                                      false);
00759             }
00760           }
00761           else
00762           {
00763             /* Read the next packet, buffering the data part. */
00764             client->task= NULL;
00765             (void)gearman_connection_recv(client->con, &(client->con->packet), &ret, true);
00766           }
00767 
00768           if (client->task == NULL)
00769           {
00770             /* Check the return of the gearman_connection_recv() calls above. */
00771             if (ret != GEARMAN_SUCCESS)
00772             {
00773               if (ret == GEARMAN_IO_WAIT)
00774                 break;
00775 
00776               client->state= GEARMAN_CLIENT_STATE_IDLE;
00777               return ret;
00778             }
00779 
00780             client->con->options.packet_in_use= true;
00781 
00782             /* We have a packet, see which task it belongs to. */
00783             for (client->task= client->task_list; client->task != NULL;
00784                  client->task= client->task->next)
00785             {
00786               if (client->task->con != client->con)
00787                 continue;
00788 
00789               if (client->con->packet.command == GEARMAN_COMMAND_JOB_CREATED)
00790               {
00791                 if (client->task->created_id != client->con->created_id)
00792                   continue;
00793 
00794                 /* New job created, drop through below and notify task. */
00795                 client->con->created_id++;
00796               }
00797               else if (client->con->packet.command == GEARMAN_COMMAND_ERROR)
00798               {
00799                 gearman_universal_set_error(&client->universal, "gearman_client_run_tasks",
00800                                   "%s:%.*s",
00801                                   (char *)(client->con->packet.arg[0]),
00802                                   (int)(client->con->packet.arg_size[1]),
00803                                   (char *)(client->con->packet.arg[1]));
00804 
00805                 return GEARMAN_SERVER_ERROR;
00806               }
00807               else if (strncmp(client->task->job_handle,
00808                                (char *)(client->con->packet.arg[0]),
00809                                client->con->packet.arg_size[0]) ||
00810                        (client->con->packet.command != GEARMAN_COMMAND_WORK_FAIL &&
00811                         strlen(client->task->job_handle) != client->con->packet.arg_size[0] - 1) ||
00812                        (client->con->packet.command == GEARMAN_COMMAND_WORK_FAIL &&
00813                         strlen(client->task->job_handle) != client->con->packet.arg_size[0]))
00814               {
00815                 continue;
00816               }
00817 
00818               /* Else, we have a matching result packet of some kind. */
00819 
00820               break;
00821             }
00822 
00823             if (client->task == NULL)
00824             {
00825               /* The client has stopped waiting for the response, ignore it. */
00826               gearman_packet_free(&(client->con->packet));
00827               client->con->options.packet_in_use= false;
00828               continue;
00829             }
00830 
00831             client->task->recv= &(client->con->packet);
00832           }
00833 
00834   case GEARMAN_CLIENT_STATE_PACKET:
00835           /* Let task process job created or result packet. */
00836           ret= _client_run_task(client, client->task);
00837           if (ret == GEARMAN_IO_WAIT)
00838             break;
00839           if (ret != GEARMAN_SUCCESS)
00840           {
00841             client->state= GEARMAN_CLIENT_STATE_PACKET;
00842             return ret;
00843           }
00844 
00845           /* Clean up the packet. */
00846           gearman_packet_free(&(client->con->packet));
00847           client->con->options.packet_in_use= false;
00848 
00849           /* If all tasks are done, return. */
00850           if (client->running_tasks == 0)
00851             break;
00852         }
00853       }
00854 
00855       /* If all tasks are done, return. */
00856       if (client->running_tasks == 0)
00857         break;
00858 
00859       if (client->new_tasks > 0 && ! (client->options.no_new))
00860         continue;
00861 
00862       if (client->options.non_blocking)
00863       {
00864         /* Let the caller wait for activity. */
00865         client->state= GEARMAN_CLIENT_STATE_IDLE;
00866 
00867         return GEARMAN_IO_WAIT;
00868       }
00869 
00870       /* Wait for activity on one of the connections. */
00871       ret= gearman_wait(&client->universal);
00872       if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00873       {
00874         client->state= GEARMAN_CLIENT_STATE_IDLE;
00875 
00876         return ret;
00877       }
00878     }
00879 
00880     break;
00881 
00882   default:
00883     gearman_universal_set_error(&client->universal, "gearman_client_run_tasks",
00884                                 "unknown state: %u", client->state);
00885 
00886     return GEARMAN_UNKNOWN_STATE;
00887   }
00888 
00889   client->state= GEARMAN_CLIENT_STATE_IDLE;
00890 
00891   return GEARMAN_SUCCESS;
00892 }
00893 
00894 gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
00895 {
00896   gearman_return_t rc;
00897 
00898   _push_non_blocking(client);
00899 
00900   rc= _client_run_tasks(client);
00901 
00902   _pop_non_blocking(client);
00903 
00904   return rc;
00905 }
00906 
00907 /*
00908  * Static Definitions
00909  */
00910 
00911 static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
00912 {
00913   if (client == NULL)
00914   {
00915     client= malloc(sizeof(gearman_client_st));
00916     if (client == NULL)
00917       return NULL;
00918 
00919     client->options.allocated= true;
00920   }
00921   else
00922   {
00923     client->options.allocated= false;
00924   }
00925 
00926   client->options.non_blocking= false;
00927   client->options.task_in_use= false;
00928   client->options.unbuffered_result= false;
00929   client->options.no_new= false;
00930   client->options.free_tasks= false;
00931 
00932   client->state= 0;
00933   client->do_ret= 0;
00934   client->new_tasks= 0;
00935   client->running_tasks= 0;
00936   client->task_count= 0;
00937   client->do_data_size= 0;
00938   client->context= NULL;
00939   client->con= NULL;
00940   client->task= NULL;
00941   client->task_list= NULL;
00942   client->task_context_free_fn= NULL;
00943   client->do_data= NULL;
00944   client->workload_fn= NULL;
00945   client->created_fn= NULL;
00946   client->data_fn= NULL;
00947   client->warning_fn= NULL;
00948   client->status_fn= NULL;
00949   client->complete_fn= NULL;
00950   client->exception_fn= NULL;
00951   client->fail_fn= NULL;
00952 
00953   if (! is_clone)
00954   {
00955     gearman_universal_st *check;
00956 
00957     check= gearman_universal_create(&client->universal, NULL);
00958     if (check == NULL)
00959     {
00960       gearman_client_free(client);
00961       return NULL;
00962     }
00963   }
00964 
00965   return client;
00966 }
00967 
00968 static gearman_return_t _client_add_server(const char *host, in_port_t port,
00969                                            void *context)
00970 {
00971   return gearman_client_add_server((gearman_client_st *)context, host, port);
00972 }
00973 
00974 static gearman_task_st *_client_add_task(gearman_client_st *client,
00975                                          gearman_task_st *task,
00976                                          void *context,
00977                                          gearman_command_t command,
00978                                          const char *function_name,
00979                                          size_t function_name_length,
00980                                          const char *unique,
00981                                          size_t unique_length,
00982                                          const void *workload,
00983                                          size_t workload_size,
00984                                          gearman_return_t *ret_ptr)
00985 {
00986   uuid_t uuid;
00987   char uuid_string[37];
00988   const void *args[3];
00989   size_t args_size[3];
00990 
00991   task= gearman_task_create(client, task);
00992   if (task == NULL)
00993   {
00994     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00995     return NULL;
00996   }
00997 
00998   task->context= context;
00999 
01000   if (unique == NULL)
01001   {
01002     uuid_generate(uuid);
01003     uuid_unparse(uuid, uuid_string);
01004     uuid_string[36]= 0;
01005     unique= uuid_string;
01006     unique_length= 36; // @note This comes from uuid/uuid.h (which does not define a number)
01007   }
01008 
01013   args[0]= function_name;
01014   args_size[0]= function_name_length + 1;
01015   args[1]= unique;
01016   args_size[1]= unique_length + 1;
01017   args[2]= workload;
01018   args_size[2]= workload_size;
01019 
01020   *ret_ptr= gearman_packet_create_args(&client->universal, &(task->send),
01021                                        GEARMAN_MAGIC_REQUEST, command,
01022                                        args, args_size, 3);
01023   if (*ret_ptr == GEARMAN_SUCCESS)
01024   {
01025     client->new_tasks++;
01026     client->running_tasks++;
01027     task->options.send_in_use= true;
01028   }
01029 
01030   return task;
01031 }
01032 
01033 static gearman_return_t _client_run_task(gearman_client_st *client,
01034                                          gearman_task_st *task)
01035 {
01036   gearman_return_t ret;
01037   char status_buffer[11]; /* Max string size to hold a uint32_t. */
01038   uint8_t x;
01039 
01040   switch(task->state)
01041   {
01042   case GEARMAN_TASK_STATE_NEW:
01043     if (task->client->universal.con_list == NULL)
01044     {
01045       client->new_tasks--;
01046       client->running_tasks--;
01047       gearman_universal_set_error(&client->universal, "_client_run_task",
01048                         "no servers added");
01049       return GEARMAN_NO_SERVERS;
01050     }
01051 
01052     for (task->con= task->client->universal.con_list; task->con != NULL;
01053          task->con= task->con->next)
01054     {
01055       if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
01056         break;
01057     }
01058 
01059     if (task->con == NULL)
01060     {
01061       client->options.no_new= true;
01062       return GEARMAN_IO_WAIT;
01063     }
01064 
01065     client->new_tasks--;
01066 
01067     if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
01068     {
01069       task->created_id= task->con->created_id_next;
01070       task->con->created_id_next++;
01071     }
01072 
01073   case GEARMAN_TASK_STATE_SUBMIT:
01074     while (1)
01075     {
01076       ret= gearman_connection_send(task->con, &(task->send),
01077                             client->new_tasks == 0 ? true : false);
01078       if (ret == GEARMAN_SUCCESS)
01079         break;
01080       else if (ret == GEARMAN_IO_WAIT)
01081       {
01082         task->state= GEARMAN_TASK_STATE_SUBMIT;
01083         return GEARMAN_IO_WAIT;
01084       }
01085       else if (ret != GEARMAN_SUCCESS)
01086       {
01087         /* Increment this since the job submission failed. */
01088         task->con->created_id++;
01089 
01090         if (ret == GEARMAN_COULD_NOT_CONNECT)
01091         {
01092           for (task->con= task->con->next; task->con != NULL;
01093                task->con= task->con->next)
01094           {
01095             if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
01096               break;
01097           }
01098         }
01099         else
01100           task->con= NULL;
01101 
01102         if (task->con == NULL)
01103         {
01104           client->running_tasks--;
01105           return ret;
01106         }
01107 
01108         if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
01109         {
01110           task->created_id= task->con->created_id_next;
01111           task->con->created_id_next++;
01112         }
01113       }
01114     }
01115 
01116     if (task->send.data_size > 0 && task->send.data == NULL)
01117     {
01118       if (client->workload_fn == NULL)
01119       {
01120         gearman_universal_set_error(&client->universal, "_client_run_task",
01121                                     "workload size > 0, but no data pointer or workload_fn was given");
01122         return GEARMAN_NEED_WORKLOAD_FN;
01123       }
01124 
01125   case GEARMAN_TASK_STATE_WORKLOAD:
01126       ret= client->workload_fn(task);
01127       if (ret != GEARMAN_SUCCESS)
01128       {
01129         task->state= GEARMAN_TASK_STATE_WORKLOAD;
01130         return ret;
01131       }
01132     }
01133 
01134     client->options.no_new= false;
01135     task->state= GEARMAN_TASK_STATE_WORK;
01136     return gearman_connection_set_events(task->con, POLLIN);
01137 
01138   case GEARMAN_TASK_STATE_WORK:
01139     if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
01140     {
01141       snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
01142                (uint32_t)(task->recv->arg_size[0]),
01143                (char *)(task->recv->arg[0]));
01144 
01145   case GEARMAN_TASK_STATE_CREATED:
01146       if (client->created_fn != NULL)
01147       {
01148         ret= client->created_fn(task);
01149         if (ret != GEARMAN_SUCCESS)
01150         {
01151           task->state= GEARMAN_TASK_STATE_CREATED;
01152           return ret;
01153         }
01154       }
01155 
01156       if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
01157           task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
01158           task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
01159       {
01160         break;
01161       }
01162     }
01163     else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
01164     {
01165   case GEARMAN_TASK_STATE_DATA:
01166       if (client->data_fn != NULL)
01167       {
01168         ret= client->data_fn(task);
01169         if (ret != GEARMAN_SUCCESS)
01170         {
01171           task->state= GEARMAN_TASK_STATE_DATA;
01172           return ret;
01173         }
01174       }
01175     }
01176     else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
01177     {
01178   case GEARMAN_TASK_STATE_WARNING:
01179       if (client->warning_fn != NULL)
01180       {
01181         ret= client->warning_fn(task);
01182         if (ret != GEARMAN_SUCCESS)
01183         {
01184           task->state= GEARMAN_TASK_STATE_WARNING;
01185           return ret;
01186         }
01187       }
01188     }
01189     else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
01190              task->recv->command == GEARMAN_COMMAND_STATUS_RES)
01191     {
01192       if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
01193       {
01194         if (atoi((char *)task->recv->arg[1]) == 0)
01195           task->options.is_known= false;
01196         else
01197           task->options.is_known= true;
01198 
01199         if (atoi((char *)task->recv->arg[2]) == 0)
01200           task->options.is_running= false;
01201         else
01202           task->options.is_running= true;
01203 
01204         x= 3;
01205       }
01206       else
01207         x= 1;
01208 
01209       task->numerator= (uint32_t)atoi((char *)task->recv->arg[x]);
01210       snprintf(status_buffer, 11, "%.*s",
01211                (uint32_t)(task->recv->arg_size[x + 1]),
01212                (char *)(task->recv->arg[x + 1]));
01213       task->denominator= (uint32_t)atoi(status_buffer);
01214 
01215   case GEARMAN_TASK_STATE_STATUS:
01216       if (client->status_fn != NULL)
01217       {
01218         ret= client->status_fn(task);
01219         if (ret != GEARMAN_SUCCESS)
01220         {
01221           task->state= GEARMAN_TASK_STATE_STATUS;
01222           return ret;
01223         }
01224       }
01225 
01226       if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
01227         break;
01228     }
01229     else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
01230     {
01231   case GEARMAN_TASK_STATE_COMPLETE:
01232       if (client->complete_fn != NULL)
01233       {
01234         ret= client->complete_fn(task);
01235         if (ret != GEARMAN_SUCCESS)
01236         {
01237           task->state= GEARMAN_TASK_STATE_COMPLETE;
01238           return ret;
01239         }
01240       }
01241 
01242       break;
01243     }
01244     else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
01245     {
01246   case GEARMAN_TASK_STATE_EXCEPTION:
01247       if (client->exception_fn != NULL)
01248       {
01249         ret= client->exception_fn(task);
01250         if (ret != GEARMAN_SUCCESS)
01251         {
01252           task->state= GEARMAN_TASK_STATE_EXCEPTION;
01253           return ret;
01254         }
01255       }
01256     }
01257     else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
01258     {
01259   case GEARMAN_TASK_STATE_FAIL:
01260       if (client->fail_fn != NULL)
01261       {
01262         ret= client->fail_fn(task);
01263         if (ret != GEARMAN_SUCCESS)
01264         {
01265           task->state= GEARMAN_TASK_STATE_FAIL;
01266           return ret;
01267         }
01268       }
01269 
01270       break;
01271     }
01272 
01273     task->state= GEARMAN_TASK_STATE_WORK;
01274     return GEARMAN_SUCCESS;
01275 
01276   case GEARMAN_TASK_STATE_FINISHED:
01277     break;
01278 
01279   default:
01280     gearman_universal_set_error(&client->universal, "_client_run_task", "unknown state: %u",
01281                                 task->state);
01282     return GEARMAN_UNKNOWN_STATE;
01283   }
01284 
01285   client->running_tasks--;
01286   task->state= GEARMAN_TASK_STATE_FINISHED;
01287 
01288   if (client->options.free_tasks)
01289     gearman_task_free(task);
01290 
01291   return GEARMAN_SUCCESS;
01292 }
01293 
01294 static void *_client_do(gearman_client_st *client, gearman_command_t command,
01295                         const char *function_name, size_t function_name_length,
01296                         const char *unique, size_t unique_length,
01297                         const void *workload, size_t workload_size,
01298                         size_t *result_size, gearman_return_t *ret_ptr)
01299 {
01300   if (! client->options.task_in_use)
01301   {
01302     (void)_client_add_task(client, &(client->do_task), client, command,
01303                            function_name, function_name_length,
01304                            unique, unique_length,
01305                            workload, workload_size,
01306                            ret_ptr);
01307     if (*ret_ptr != GEARMAN_SUCCESS)
01308       return NULL;
01309 
01310     client->options.task_in_use= true;
01311   }
01312 
01313   client->workload_fn= NULL;
01314   client->created_fn= NULL;
01315   client->data_fn= _client_do_data;
01316   client->warning_fn= _client_do_data;
01317   client->status_fn= _client_do_status;
01318   client->complete_fn= _client_do_data;
01319   client->exception_fn= _client_do_data;
01320   client->fail_fn= _client_do_fail;
01321 
01322   *ret_ptr= gearman_client_run_tasks(client);
01323   if (*ret_ptr != GEARMAN_IO_WAIT && (*ret_ptr != GEARMAN_PAUSE ||
01324       client->do_ret == GEARMAN_WORK_FAIL))
01325   {
01326     gearman_task_free(&(client->do_task));
01327     client->options.task_in_use= false;
01328     client->new_tasks= 0;
01329     client->running_tasks= 0;
01330   }
01331 
01332   workload= NULL;
01333 
01334   if (*ret_ptr == GEARMAN_SUCCESS || *ret_ptr == GEARMAN_PAUSE)
01335   {
01336     *ret_ptr= client->do_ret;
01337     workload= client->do_data;
01338     *result_size= client->do_data_size;
01339     client->do_data= NULL;
01340     client->do_data_size= 0;
01341   }
01342 
01343   return (void *)workload;
01344 }
01345 
01346 static gearman_return_t _client_do_background(gearman_client_st *client,
01347                                               gearman_command_t command,
01348                                               const char *function_name,
01349                                               size_t function_name_length,
01350                                               const char *unique,
01351                                               size_t unique_length,
01352                                               const void *workload,
01353                                               size_t workload_size,
01354                                               char *job_handle)
01355 {
01356   gearman_return_t ret;
01357 
01358   if (! client->options.task_in_use)
01359   {
01360     (void)_client_add_task(client, &(client->do_task), client, command,
01361                            function_name, function_name_length,
01362                            unique, unique_length,
01363                            workload, workload_size,
01364                            &ret);
01365     if (ret != GEARMAN_SUCCESS)
01366       return ret;
01367 
01368     client->options.task_in_use= true;
01369   }
01370 
01371   gearman_client_clear_fn(client);
01372 
01373   ret= gearman_client_run_tasks(client);
01374   if (ret != GEARMAN_IO_WAIT)
01375   {
01376     if (job_handle)
01377     {
01378       strncpy(job_handle, client->do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
01379     }
01380 
01381     gearman_task_free(&(client->do_task));
01382     client->options.task_in_use= false;
01383     client->new_tasks= 0;
01384     client->running_tasks= 0;
01385   }
01386 
01387   return ret;
01388 }
01389 
01390 static gearman_return_t _client_do_data(gearman_task_st *task)
01391 {
01392   gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01393 
01394   if (client->do_ret != GEARMAN_SUCCESS)
01395   {
01396     client->do_ret= GEARMAN_SUCCESS;
01397     return GEARMAN_SUCCESS;
01398   }
01399 
01400   client->do_data= gearman_task_take_data(task, &(client->do_data_size));
01401 
01402   if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
01403     client->do_ret= GEARMAN_WORK_DATA;
01404   else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
01405     client->do_ret= GEARMAN_WORK_WARNING;
01406   else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
01407     client->do_ret= GEARMAN_WORK_EXCEPTION;
01408   else
01409     return GEARMAN_SUCCESS;
01410 
01411   return GEARMAN_PAUSE;
01412 }
01413 
01414 static gearman_return_t _client_do_status(gearman_task_st *task)
01415 {
01416   gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01417 
01418   if (client->do_ret != GEARMAN_SUCCESS)
01419   {
01420     client->do_ret= GEARMAN_SUCCESS;
01421     return GEARMAN_SUCCESS;
01422   }
01423 
01424   client->do_ret= GEARMAN_WORK_STATUS;
01425   return GEARMAN_PAUSE;
01426 }
01427 
01428 static gearman_return_t _client_do_fail(gearman_task_st *task)
01429 {
01430   gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01431 
01432   client->do_ret= GEARMAN_WORK_FAIL;
01433   return GEARMAN_SUCCESS;
01434 }