Gearman Developer Documentation

libgearman-server/job.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 /*
00017  * Private declarations
00018  */
00019 
00029 static uint32_t _server_job_hash(const char *key, size_t key_size);
00030 
00034 static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
00035                                                   gearman_server_worker_st *worker);
00036 
00041 static gearman_server_job_st *
00042 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00043                        gearman_server_function_st *server_function,
00044                        const char *unique, size_t data_size);
00045 
00048 /*
00049  * Public definitions
00050  */
00051 
00052 gearman_server_job_st *
00053 gearman_server_job_add(gearman_server_st *server, const char *function_name,
00054                        size_t function_name_size, const char *unique,
00055                        size_t unique_size, const void *data, size_t data_size,
00056                        gearman_job_priority_t priority,
00057                        gearman_server_client_st *server_client,
00058                        gearman_return_t *ret_ptr)
00059 {
00060   gearman_server_job_st *server_job;
00061   gearman_server_function_st *server_function;
00062   uint32_t key;
00063 
00064   server_function= gearman_server_function_get(server, function_name,
00065                                                function_name_size);
00066   if (server_function == NULL)
00067   {
00068     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00069     return NULL;
00070   }
00071 
00072   if (unique_size == 0)
00073   {
00074     server_job= NULL;
00075     key= 0;
00076   }
00077   else
00078   {
00079     if (unique_size == 1 && *unique ==  '-')
00080     {
00081       if (data_size == 0)
00082       {
00083         key= 0;
00084         server_job= NULL;
00085       }
00086       else
00087       {
00088         /* Look up job via unique data when unique = '-'. */
00089         key= _server_job_hash(data, data_size);
00090         server_job= _server_job_get_unique(server, key, server_function, data,
00091                                            data_size);
00092       }
00093     }
00094     else
00095     {
00096       /* Look up job via unique ID first to make sure it's not a duplicate. */
00097       key= _server_job_hash(unique, unique_size);
00098       server_job= _server_job_get_unique(server, key, server_function, unique,
00099                                          0);
00100     }
00101   }
00102 
00103   if (server_job == NULL)
00104   {
00105     if (server_function->max_queue_size > 0 &&
00106         server_function->job_total >= server_function->max_queue_size)
00107     {
00108       *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
00109       return NULL;
00110     }
00111 
00112     server_job= gearman_server_job_create(server, NULL);
00113     if (server_job == NULL)
00114     {
00115       *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00116       return NULL;
00117     }
00118 
00119     server_job->priority= priority;
00120 
00121     server_job->function= server_function;
00122     server_function->job_total++;
00123 
00124     snprintf(server_job->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s:%u",
00125              server->job_handle_prefix, server->job_handle_count);
00126     snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
00127              (uint32_t)unique_size, unique);
00128     server->job_handle_count++;
00129     server_job->data= data;
00130     server_job->data_size= data_size;
00131 
00132     server_job->unique_key= key;
00133     key= key % GEARMAN_JOB_HASH_SIZE;
00134     GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
00135 
00136     key= _server_job_hash(server_job->job_handle,
00137                           strlen(server_job->job_handle));
00138     server_job->job_handle_key= key;
00139     key= key % GEARMAN_JOB_HASH_SIZE;
00140     GEARMAN_HASH_ADD(server->job, key, server_job,);
00141 
00142     if (server->state.queue_startup)
00143     {
00144       server_job->job_queued= true;
00145     }
00146     else if (server_client == NULL && server->queue_add_fn != NULL)
00147     {
00148       *ret_ptr= (*(server->queue_add_fn))(server,
00149                                           (void *)server->queue_context,
00150                                           server_job->unique,
00151                                           unique_size,
00152                                           function_name,
00153                                           function_name_size,
00154                                           data, data_size, priority);
00155       if (*ret_ptr != GEARMAN_SUCCESS)
00156       {
00157         server_job->data= NULL;
00158         gearman_server_job_free(server_job);
00159         return NULL;
00160       }
00161 
00162       if (server->queue_flush_fn != NULL)
00163       {
00164         *ret_ptr= (*(server->queue_flush_fn))(server,
00165                                               (void *)server->queue_context);
00166         if (*ret_ptr != GEARMAN_SUCCESS)
00167         {
00168           server_job->data= NULL;
00169           gearman_server_job_free(server_job);
00170           return NULL;
00171         }
00172       }
00173 
00174       server_job->job_queued= true;
00175     }
00176 
00177     *ret_ptr= gearman_server_job_queue(server_job);
00178     if (*ret_ptr != GEARMAN_SUCCESS)
00179     {
00180       if (server_client == NULL && server->queue_done_fn != NULL)
00181       {
00182         /* Do our best to remove the job from the queue. */
00183         (void)(*(server->queue_done_fn))(server,
00184                                       (void *)server->queue_context,
00185                                       server_job->unique, unique_size,
00186                                       server_job->function->function_name,
00187                                       server_job->function->function_name_size);
00188       }
00189 
00190       gearman_server_job_free(server_job);
00191       return NULL;
00192     }
00193   }
00194   else
00195     *ret_ptr= GEARMAN_JOB_EXISTS;
00196 
00197   if (server_client != NULL)
00198   {
00199     server_client->job= server_job;
00200     GEARMAN_LIST_ADD(server_job->client, server_client, job_)
00201   }
00202 
00203   return server_job;
00204 }
00205 
00206 gearman_server_job_st *
00207 gearman_server_job_create(gearman_server_st *server,
00208                           gearman_server_job_st *server_job)
00209 {
00210   if (server_job == NULL)
00211   {
00212     if (server->free_job_count > 0)
00213     {
00214       server_job= server->free_job_list;
00215       GEARMAN_LIST_DEL(server->free_job, server_job,)
00216     }
00217     else
00218     {
00219       server_job= (gearman_server_job_st *)malloc(sizeof(gearman_server_job_st));
00220       if (server_job == NULL)
00221         return NULL;
00222     }
00223 
00224     server_job->options.allocated= true;
00225   }
00226   else
00227     server_job->options.allocated= false;
00228 
00229   server_job->ignore_job= false;
00230   server_job->job_queued= false;
00231   server_job->retries= 0;
00232   server_job->priority= 0;
00233   server_job->job_handle_key= 0;
00234   server_job->unique_key= 0;
00235   server_job->client_count= 0;
00236   server_job->numerator= 0;
00237   server_job->denominator= 0;
00238   server_job->data_size= 0;
00239   server_job->server= server;
00240   server_job->next= NULL;
00241   server_job->prev= NULL;
00242   server_job->unique_next= NULL;
00243   server_job->unique_prev= NULL;
00244   server_job->worker_next= NULL;
00245   server_job->worker_prev= NULL;
00246   server_job->function= NULL;
00247   server_job->function_next= NULL;
00248   server_job->data= NULL;
00249   server_job->client_list= NULL;
00250   server_job->worker= NULL;
00251   server_job->job_handle[0]= 0;
00252   server_job->unique[0]= 0;
00253 
00254   return server_job;
00255 }
00256 
00257 void gearman_server_job_free(gearman_server_job_st *server_job)
00258 {
00259   uint32_t key;
00260 
00261   if (server_job->worker != NULL)
00262     server_job->function->job_running--;
00263 
00264   server_job->function->job_total--;
00265 
00266   if (server_job->data != NULL)
00267     free((void *)(server_job->data));
00268 
00269   while (server_job->client_list != NULL)
00270     gearman_server_client_free(server_job->client_list);
00271 
00272   if (server_job->worker != NULL)
00273     GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_)
00274 
00275   key= server_job->unique_key % GEARMAN_JOB_HASH_SIZE;
00276   GEARMAN_HASH_DEL(server_job->server->unique, key, server_job, unique_);
00277 
00278   key= server_job->job_handle_key % GEARMAN_JOB_HASH_SIZE;
00279   GEARMAN_HASH_DEL(server_job->server->job, key, server_job,);
00280 
00281   if (server_job->options.allocated)
00282   {
00283     if (server_job->server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
00284       GEARMAN_LIST_ADD(server_job->server->free_job, server_job,)
00285     else
00286       free(server_job);
00287   }
00288 }
00289 
00290 gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
00291                                               const char *job_handle,
00292                                               gearman_server_con_st *worker_con)
00293 {
00294   uint32_t key;
00295 
00296   key= _server_job_hash(job_handle, strlen(job_handle));
00297 
00298   for (gearman_server_job_st *server_job= server->job_hash[key % GEARMAN_JOB_HASH_SIZE];
00299        server_job != NULL; server_job= server_job->next)
00300   {
00301     if (server_job->job_handle_key == key &&
00302         !strcmp(server_job->job_handle, job_handle))
00303     {
00304       /* Check to make sure the worker asking for the job still owns the job. */
00305       if (worker_con != NULL &&
00306           (server_job->worker == NULL || server_job->worker->con != worker_con))
00307       {
00308         return NULL;
00309       }
00310 
00311       return server_job;
00312     }
00313   }
00314 
00315   return NULL;
00316 }
00317 
00318 gearman_server_job_st *
00319 gearman_server_job_peek(gearman_server_con_st *server_con)
00320 {
00321   gearman_server_worker_st *server_worker;
00322   gearman_job_priority_t priority;
00323 
00324   for (server_worker= server_con->worker_list; server_worker != NULL;
00325        server_worker= server_worker->con_next)
00326   {
00327     if (server_worker->function->job_count != 0)
00328     {
00329       for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00330            priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00331       {
00332         if (server_worker->function->job_list[priority] != NULL)
00333         {
00334           if (server_worker->function->job_list[priority]->ignore_job)
00335           {
00336             /* This is only happens when a client disconnects from a foreground
00337               job. We do this because we don't want to run the job anymore. */
00338             server_worker->function->job_list[priority]->ignore_job= false;
00339 
00340             gearman_server_job_free(gearman_server_job_take(server_con));
00341 
00342             return gearman_server_job_peek(server_con);
00343           }
00344           return server_worker->function->job_list[priority];
00345         }
00346       }
00347     }
00348   }
00349 
00350   return NULL;
00351 }
00352 
00353 static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
00354                                                   gearman_server_worker_st *worker)
00355 {
00356   worker->con_prev= NULL;
00357   worker->con_next= list;
00358   while (worker->con_next != NULL)
00359   {
00360     worker->con_prev= worker->con_next;
00361     worker->con_next= worker->con_next->con_next;
00362   }
00363   if (worker->con_prev)
00364     worker->con_prev->con_next= worker;
00365 }
00366 
00367 gearman_server_job_st *
00368 gearman_server_job_take(gearman_server_con_st *server_con)
00369 {
00370   gearman_server_worker_st *server_worker;
00371   gearman_server_job_st *server_job;
00372   gearman_job_priority_t priority;
00373 
00374   for (server_worker= server_con->worker_list; server_worker != NULL;
00375        server_worker= server_worker->con_next)
00376   {
00377     if (server_worker->function->job_count != 0)
00378       break;
00379   }
00380 
00381   if (server_worker == NULL)
00382     return NULL;
00383 
00384   if (server_con->thread->server->flags.round_robin)
00385   {
00386     GEARMAN_LIST_DEL(server_con->worker, server_worker, con_)
00387     _server_con_worker_list_append(server_con->worker_list, server_worker);
00388     ++server_con->worker_count;
00389     if (server_con->worker_list == NULL) {
00390       server_con->worker_list= server_worker;
00391     }
00392   }
00393 
00394   for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00395        priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00396   {
00397     if (server_worker->function->job_list[priority] != NULL)
00398       break;
00399   }
00400 
00401   server_job= server_worker->function->job_list[priority];
00402   server_job->function->job_list[priority]= server_job->function_next;
00403   if (server_job->function->job_end[priority] == server_job)
00404     server_job->function->job_end[priority]= NULL;
00405   server_job->function->job_count--;
00406 
00407   server_job->worker= server_worker;
00408   GEARMAN_LIST_ADD(server_worker->job, server_job, worker_)
00409   server_job->function->job_running++;
00410 
00411   if (server_job->ignore_job)
00412   {
00413     gearman_server_job_free(server_job);
00414     return gearman_server_job_take(server_con);
00415   }
00416 
00417   return server_job;
00418 }
00419 
00420 gearman_return_t gearman_server_job_queue(gearman_server_job_st *job)
00421 {
00422   gearman_server_client_st *client;
00423   gearman_server_worker_st *worker;
00424   uint32_t noop_sent;
00425   gearman_return_t ret;
00426 
00427   if (job->worker != NULL)
00428   {
00429     job->retries++;
00430     if (job->server->job_retries == job->retries)
00431     {
00432        gearman_log_error(job->server->gearman,
00433                             "Dropped job due to max retry count: %s %s",
00434                             job->job_handle, job->unique);
00435        for (client= job->client_list; client != NULL; client= client->job_next)
00436        {
00437          ret= gearman_server_io_packet_add(client->con, false,
00438                                            GEARMAN_MAGIC_RESPONSE,
00439                                            GEARMAN_COMMAND_WORK_FAIL,
00440                                            job->job_handle,
00441                                            (size_t)strlen(job->job_handle),
00442                                            NULL);
00443          if (ret != GEARMAN_SUCCESS)
00444            return ret;
00445       }
00446 
00447       /* Remove from persistent queue if one exists. */
00448       if (job->job_queued && job->server->queue_done_fn != NULL)
00449       {
00450         ret= (*(job->server->queue_done_fn))(job->server,
00451                                              (void *)job->server->queue_context,
00452                                              job->unique,
00453                                              (size_t)strlen(job->unique),
00454                                              job->function->function_name,
00455                                              job->function->function_name_size);
00456         if (ret != GEARMAN_SUCCESS)
00457           return ret;
00458       }
00459 
00460       gearman_server_job_free(job);
00461       return GEARMAN_SUCCESS;
00462     }
00463 
00464     GEARMAN_LIST_DEL(job->worker->job, job, worker_)
00465     job->worker= NULL;
00466     job->function->job_running--;
00467     job->function_next= NULL;
00468     job->numerator= 0;
00469     job->denominator= 0;
00470   }
00471 
00472   /* Queue NOOP for possible sleeping workers. */
00473   if (job->function->worker_list != NULL)
00474   {
00475     worker= job->function->worker_list;
00476     noop_sent= 0;
00477     do
00478     {
00479       if (worker->con->is_sleeping && ! (worker->con->is_noop_sent))
00480       {
00481         ret= gearman_server_io_packet_add(worker->con, false,
00482                                           GEARMAN_MAGIC_RESPONSE,
00483                                           GEARMAN_COMMAND_NOOP, NULL);
00484         if (ret != GEARMAN_SUCCESS)
00485           return ret;
00486 
00487         worker->con->is_noop_sent= true;
00488         noop_sent++;
00489       }
00490 
00491       worker= worker->function_next;
00492     }
00493     while (worker != job->function->worker_list &&
00494            (job->server->worker_wakeup == 0 ||
00495            noop_sent < job->server->worker_wakeup));
00496 
00497     job->function->worker_list= worker;
00498   }
00499 
00500   /* Queue the job to be run. */
00501   if (job->function->job_list[job->priority] == NULL)
00502     job->function->job_list[job->priority]= job;
00503   else
00504     job->function->job_end[job->priority]->function_next= job;
00505   job->function->job_end[job->priority]= job;
00506   job->function->job_count++;
00507 
00508   return GEARMAN_SUCCESS;
00509 }
00510 
00511 /*
00512  * Private definitions
00513  */
00514 
00515 static uint32_t _server_job_hash(const char *key, size_t key_size)
00516 {
00517   const char *ptr= key;
00518   int32_t value= 0;
00519 
00520   while (key_size--)
00521   {
00522     value += (int32_t)*ptr++;
00523     value += (value << 10);
00524     value ^= (value >> 6);
00525   }
00526   value += (value << 3);
00527   value ^= (value >> 11);
00528   value += (value << 15);
00529 
00530   return (uint32_t)(value == 0 ? 1 : value);
00531 }
00532 
00533 static gearman_server_job_st *
00534 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00535                        gearman_server_function_st *server_function,
00536                        const char *unique, size_t data_size)
00537 {
00538   gearman_server_job_st *server_job;
00539 
00540   for (server_job= server->unique_hash[unique_key % GEARMAN_JOB_HASH_SIZE];
00541        server_job != NULL; server_job= server_job->unique_next)
00542   {
00543     if (data_size == 0)
00544     {
00545       if (server_job->function == server_function &&
00546           server_job->unique_key == unique_key &&
00547           !strcmp(server_job->unique, unique))
00548       {
00549         return server_job;
00550       }
00551     }
00552     else
00553     {
00554       if (server_job->function == server_function &&
00555           server_job->unique_key == unique_key &&
00556           server_job->data_size == data_size &&
00557           !memcmp(server_job->data, unique, data_size))
00558       {
00559         return server_job;
00560       }
00561     }
00562   }
00563 
00564   return NULL;
00565 }