Gearman Developer Documentation

libgearman/job.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00025 static gearman_return_t _job_send(gearman_job_st *job);
00026 
00029 /*
00030  * Public Definitions
00031  */
00032 
00033 gearman_job_st *gearman_job_create(gearman_worker_st *worker,
00034                                    gearman_job_st *job)
00035 {
00036   if (job == NULL)
00037   {
00038     job= malloc(sizeof(gearman_job_st));
00039     if (job == NULL)
00040     {
00041       gearman_universal_set_error((&worker->universal), "_job_create", "malloc");
00042       return NULL;
00043     }
00044 
00045     job->options.allocated= true;
00046   }
00047   else
00048   {
00049     job->options.allocated= false;
00050   }
00051 
00052   job->options.assigned_in_use= false;
00053   job->options.work_in_use= false;
00054   job->options.finished= false;
00055 
00056   job->worker= worker;
00057 
00058   if (worker->job_list != NULL)
00059     worker->job_list->prev= job;
00060   job->next= worker->job_list;
00061   job->prev= NULL;
00062   worker->job_list= job;
00063   worker->job_count++;
00064 
00065   job->con= NULL;
00066 
00067   return job;
00068 }
00069 
00070 
00071 gearman_return_t gearman_job_send_data(gearman_job_st *job, const void *data,
00072                                        size_t data_size)
00073 {
00074   gearman_return_t ret;
00075   const void *args[2];
00076   size_t args_size[2];
00077 
00078   if (! (job->options.work_in_use))
00079   {
00080     args[0]= job->assigned.arg[0];
00081     args_size[0]= job->assigned.arg_size[0];
00082     args[1]= data;
00083     args_size[1]= data_size;
00084     ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00085                                     GEARMAN_MAGIC_REQUEST,
00086                                     GEARMAN_COMMAND_WORK_DATA,
00087                                     args, args_size, 2);
00088     if (ret != GEARMAN_SUCCESS)
00089       return ret;
00090 
00091     job->options.work_in_use= true;
00092   }
00093 
00094   return _job_send(job);
00095 }
00096 
00097 gearman_return_t gearman_job_send_warning(gearman_job_st *job,
00098                                           const void *warning,
00099                                           size_t warning_size)
00100 {
00101   gearman_return_t ret;
00102   const void *args[2];
00103   size_t args_size[2];
00104 
00105   if (! (job->options.work_in_use))
00106   {
00107     args[0]= job->assigned.arg[0];
00108     args_size[0]= job->assigned.arg_size[0];
00109     args[1]= warning;
00110     args_size[1]= warning_size;
00111     ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00112                                     GEARMAN_MAGIC_REQUEST,
00113                                     GEARMAN_COMMAND_WORK_WARNING,
00114                                     args, args_size, 2);
00115     if (ret != GEARMAN_SUCCESS)
00116       return ret;
00117 
00118     job->options.work_in_use= true;
00119   }
00120 
00121   return _job_send(job);
00122 }
00123 
00124 gearman_return_t gearman_job_send_status(gearman_job_st *job,
00125                                          uint32_t numerator,
00126                                          uint32_t denominator)
00127 {
00128   gearman_return_t ret;
00129   char numerator_string[12];
00130   char denominator_string[12];
00131   const void *args[3];
00132   size_t args_size[3];
00133 
00134   if (! (job->options.work_in_use))
00135   {
00136     snprintf(numerator_string, 12, "%u", numerator);
00137     snprintf(denominator_string, 12, "%u", denominator);
00138 
00139     args[0]= job->assigned.arg[0];
00140     args_size[0]= job->assigned.arg_size[0];
00141     args[1]= numerator_string;
00142     args_size[1]= strlen(numerator_string) + 1;
00143     args[2]= denominator_string;
00144     args_size[2]= strlen(denominator_string);
00145     ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00146                                     GEARMAN_MAGIC_REQUEST,
00147                                     GEARMAN_COMMAND_WORK_STATUS,
00148                                     args, args_size, 3);
00149     if (ret != GEARMAN_SUCCESS)
00150       return ret;
00151 
00152     job->options.work_in_use= true;
00153   }
00154 
00155   return _job_send(job);
00156 }
00157 
00158 gearman_return_t gearman_job_send_complete(gearman_job_st *job,
00159                                            const void *result,
00160                                            size_t result_size)
00161 {
00162   gearman_return_t ret;
00163   const void *args[2];
00164   size_t args_size[2];
00165 
00166   if (job->options.finished)
00167     return GEARMAN_SUCCESS;
00168 
00169   if (! (job->options.work_in_use))
00170   {
00171     args[0]= job->assigned.arg[0];
00172     args_size[0]= job->assigned.arg_size[0];
00173     args[1]= result;
00174     args_size[1]= result_size;
00175     ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00176                                  GEARMAN_MAGIC_REQUEST,
00177                                  GEARMAN_COMMAND_WORK_COMPLETE,
00178                                  args, args_size, 2);
00179     if (ret != GEARMAN_SUCCESS)
00180       return ret;
00181 
00182     job->options.work_in_use= true;
00183   }
00184 
00185   ret= _job_send(job);
00186   if (ret != GEARMAN_SUCCESS)
00187     return ret;
00188 
00189   job->options.finished= true;
00190 
00191   return GEARMAN_SUCCESS;
00192 }
00193 
00194 gearman_return_t gearman_job_send_exception(gearman_job_st *job,
00195                                             const void *exception,
00196                                             size_t exception_size)
00197 {
00198   gearman_return_t ret;
00199   const void *args[2];
00200   size_t args_size[2];
00201 
00202   if (! (job->options.work_in_use))
00203   {
00204     args[0]= job->assigned.arg[0];
00205     args_size[0]= job->assigned.arg_size[0];
00206     args[1]= exception;
00207     args_size[1]= exception_size;
00208     ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00209                                     GEARMAN_MAGIC_REQUEST,
00210                                     GEARMAN_COMMAND_WORK_EXCEPTION,
00211                                     args, args_size, 2);
00212     if (ret != GEARMAN_SUCCESS)
00213       return ret;
00214 
00215     job->options.work_in_use= true;
00216   }
00217 
00218   return _job_send(job);
00219 }
00220 
00221 gearman_return_t gearman_job_send_fail(gearman_job_st *job)
00222 {
00223   gearman_return_t ret;
00224   const void *args[1];
00225   size_t args_size[1];
00226 
00227   if (job->options.finished)
00228     return GEARMAN_SUCCESS;
00229 
00230   if (! (job->options.work_in_use))
00231   {
00232     args[0]= job->assigned.arg[0];
00233     args_size[0]= job->assigned.arg_size[0] - 1;
00234     ret= gearman_packet_create_args(&(job->worker->universal), &(job->work),
00235                                     GEARMAN_MAGIC_REQUEST,
00236                                     GEARMAN_COMMAND_WORK_FAIL,
00237                                     args, args_size, 1);
00238     if (ret != GEARMAN_SUCCESS)
00239       return ret;
00240 
00241     job->options.work_in_use= true;
00242   }
00243 
00244   ret= _job_send(job);
00245   if (ret != GEARMAN_SUCCESS)
00246     return ret;
00247 
00248   job->options.finished= true;
00249   return GEARMAN_SUCCESS;
00250 }
00251 
00252 const char *gearman_job_handle(const gearman_job_st *job)
00253 {
00254   return (const char *)job->assigned.arg[0];
00255 }
00256 
00257 const char *gearman_job_function_name(const gearman_job_st *job)
00258 {
00259   return (const char *)job->assigned.arg[1];
00260 }
00261 
00262 const char *gearman_job_unique(const gearman_job_st *job)
00263 {
00264   if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
00265     return (const char *)job->assigned.arg[2];
00266   return "";
00267 }
00268 
00269 const void *gearman_job_workload(const gearman_job_st *job)
00270 {
00271   return job->assigned.data;
00272 }
00273 
00274 size_t gearman_job_workload_size(const gearman_job_st *job)
00275 {
00276   return job->assigned.data_size;
00277 }
00278 
00279 void *gearman_job_take_workload(gearman_job_st *job, size_t *data_size)
00280 {
00281   return gearman_packet_take_data(&(job->assigned), data_size);
00282 }
00283 
00284 /*
00285  * Static Definitions
00286  */
00287 
00288 static gearman_return_t _job_send(gearman_job_st *job)
00289 {
00290   gearman_return_t ret;
00291 
00292   ret= gearman_connection_send(job->con, &(job->work), true);
00293   if (ret != GEARMAN_SUCCESS)
00294     return ret;
00295 
00296   gearman_packet_free(&(job->work));
00297   job->options.work_in_use= false;
00298 
00299   return GEARMAN_SUCCESS;
00300 }