Gearman Developer Documentation

libgearman-server/server.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 /*
00017  * Private declarations
00018  */
00019 
00029 gearman_return_t _queue_replay_add(gearman_server_st *server, void *context,
00030                                    const void *unique, size_t unique_size,
00031                                    const void *function_name,
00032                                    size_t function_name_size, const void *data,
00033                                    size_t data_size,
00034                                    gearman_job_priority_t priority);
00035 
00039 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00040                                              const char *error_code,
00041                                              const char *error_string);
00042 
00046 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00047                                          gearman_packet_st *packet);
00048 
00052 static gearman_return_t
00053 _server_queue_work_data(gearman_server_job_st *server_job,
00054                         gearman_packet_st *packet, gearman_command_t command);
00055 
00059 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00060 
00063 /*
00064  * Public definitions
00065  */
00066 
00067 gearman_server_st *gearman_server_create(gearman_server_st *server)
00068 {
00069   struct utsname un;
00070 
00071   if (server == NULL)
00072   {
00073     server= (gearman_server_st *)malloc(sizeof(gearman_server_st));
00074     if (server == NULL)
00075       return NULL;
00076 
00077     server->options.allocated= true;
00078   }
00079   else
00080     server->options.allocated= false;
00081 
00082   server->state.queue_startup= false;
00083   server->flags.round_robin= false;
00084   server->flags.threaded= false;
00085   server->shutdown= false;
00086   server->shutdown_graceful= false;
00087   server->proc_wakeup= false;
00088   server->proc_shutdown= false;
00089   server->job_retries= 0;
00090   server->worker_wakeup= 0;
00091   server->thread_count= 0;
00092   server->free_packet_count= 0;
00093   server->function_count= 0;
00094   server->job_count= 0;
00095   server->unique_count= 0;
00096   server->free_job_count= 0;
00097   server->free_client_count= 0;
00098   server->free_worker_count= 0;
00099   server->thread_list= NULL;
00100   server->free_packet_list= NULL;
00101   server->function_list= NULL;
00102   server->free_job_list= NULL;
00103   server->free_client_list= NULL;
00104   server->free_worker_list= NULL;
00105   server->log_fn= NULL;
00106   server->log_context= NULL;
00107   server->queue_context= NULL;
00108   server->queue_add_fn= NULL;
00109   server->queue_flush_fn= NULL;
00110   server->queue_done_fn= NULL;
00111   server->queue_replay_fn= NULL;
00112   memset(server->job_hash, 0,
00113          sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00114   memset(server->unique_hash, 0,
00115          sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00116 
00117   server->gearman= gearman_universal_create(&(server->gearman_universal_static), NULL);
00118   if (server->gearman == NULL)
00119   {
00120     gearman_server_free(server);
00121     return NULL;
00122   }
00123 
00124   if (uname(&un) == -1)
00125   {
00126     gearman_server_free(server);
00127     return NULL;
00128   }
00129 
00130   snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
00131            un.nodename);
00132   server->job_handle_count= 1;
00133 
00134   return server;
00135 }
00136 
00137 void gearman_server_free(gearman_server_st *server)
00138 {
00139   uint32_t key;
00140   gearman_server_packet_st *packet;
00141   gearman_server_job_st *job;
00142   gearman_server_client_st *client;
00143   gearman_server_worker_st *worker;
00144 
00145   /* All threads should be cleaned up before calling this. */
00146   assert(server->thread_list == NULL);
00147 
00148   for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
00149   {
00150     while (server->job_hash[key] != NULL)
00151       gearman_server_job_free(server->job_hash[key]);
00152   }
00153 
00154   while (server->function_list != NULL)
00155     gearman_server_function_free(server->function_list);
00156 
00157   while (server->free_packet_list != NULL)
00158   {
00159     packet= server->free_packet_list;
00160     server->free_packet_list= packet->next;
00161     free(packet);
00162   }
00163 
00164   while (server->free_job_list != NULL)
00165   {
00166     job= server->free_job_list;
00167     server->free_job_list= job->next;
00168     free(job);
00169   }
00170 
00171   while (server->free_client_list != NULL)
00172   {
00173     client= server->free_client_list;
00174     server->free_client_list= client->con_next;
00175     free(client);
00176   }
00177 
00178   while (server->free_worker_list != NULL)
00179   {
00180     worker= server->free_worker_list;
00181     server->free_worker_list= worker->con_next;
00182     free(worker);
00183   }
00184 
00185   if (server->gearman != NULL)
00186     gearman_universal_free(server->gearman);
00187 
00188   if (server->options.allocated)
00189     free(server);
00190 }
00191 
00192 void gearman_server_set_job_retries(gearman_server_st *server,
00193                                     uint8_t job_retries)
00194 {
00195   server->job_retries= job_retries;
00196 }
00197 
00198 void gearman_server_set_worker_wakeup(gearman_server_st *server,
00199                                       uint8_t worker_wakeup)
00200 {
00201   server->worker_wakeup= worker_wakeup;
00202 }
00203 
00204 void gearman_server_set_log_fn(gearman_server_st *server,
00205                                gearman_log_fn *function,
00206                                void *context, gearman_verbose_t verbose)
00207 {
00208   server->log_fn= function;
00209   server->log_context= context;
00210   gearman_set_log_fn(server->gearman, _log, server, verbose);
00211 }
00212 
00213 gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
00214                                             gearman_packet_st *packet)
00215 {
00216   gearman_return_t ret;
00217   gearman_server_job_st *server_job;
00218   char job_handle[GEARMAN_JOB_HANDLE_SIZE];
00219   char option[GEARMAN_OPTION_SIZE];
00220   gearman_server_client_st *server_client;
00221   char numerator_buffer[11]; /* Max string size to hold a uint32_t. */
00222   char denominator_buffer[11]; /* Max string size to hold a uint32_t. */
00223   gearman_job_priority_t priority;
00224   gearman_server_st *server= server_con->thread->server;
00225 
00226   if (packet->magic == GEARMAN_MAGIC_RESPONSE)
00227   {
00228     return _server_error_packet(server_con, "bad_magic",
00229                                 "Request magic expected");
00230   }
00231 
00232   switch (packet->command)
00233   {
00234   /* Client/worker requests. */
00235   case GEARMAN_COMMAND_ECHO_REQ:
00236     /* Reuse the data buffer and just shove the data back. */
00237     ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
00238                                       GEARMAN_COMMAND_ECHO_RES, packet->data,
00239                                       packet->data_size, NULL);
00240     if (ret != GEARMAN_SUCCESS)
00241       return ret;
00242 
00243     packet->options.free_data= false;
00244 
00245     break;
00246 
00247   /* Client requests. */
00248   case GEARMAN_COMMAND_SUBMIT_JOB:
00249   case GEARMAN_COMMAND_SUBMIT_JOB_BG:
00250   case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
00251   case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
00252   case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
00253   case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
00254 
00255     if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
00256         packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG)
00257     {
00258       priority= GEARMAN_JOB_PRIORITY_NORMAL;
00259     }
00260     else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
00261              packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
00262     {
00263       priority= GEARMAN_JOB_PRIORITY_HIGH;
00264     }
00265     else
00266       priority= GEARMAN_JOB_PRIORITY_LOW;
00267 
00268     if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
00269         packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
00270         packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
00271     {
00272       server_client= NULL;
00273     }
00274     else
00275     {
00276       server_client= gearman_server_client_add(server_con);
00277       if (server_client == NULL)
00278         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00279     }
00280 
00281     /* Create a job. */
00282     server_job= gearman_server_job_add(server_con->thread->server,
00283                                        (char *)(packet->arg[0]),
00284                                        packet->arg_size[0] - 1,
00285                                        (char *)(packet->arg[1]),
00286                                        packet->arg_size[1] - 1, packet->data,
00287                                        packet->data_size, priority,
00288                                        server_client, &ret);
00289     if (ret == GEARMAN_SUCCESS)
00290     {
00291       packet->options.free_data= false;
00292     }
00293     else if (ret == GEARMAN_JOB_QUEUE_FULL)
00294     {
00295       return _server_error_packet(server_con, "queue_full",
00296                                   "Job queue is full");
00297     }
00298     else if (ret != GEARMAN_JOB_EXISTS)
00299       return ret;
00300 
00301     /* Queue the job created packet. */
00302     ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00303                                       GEARMAN_COMMAND_JOB_CREATED,
00304                                       server_job->job_handle,
00305                                       (size_t)strlen(server_job->job_handle),
00306                                       NULL);
00307     if (ret != GEARMAN_SUCCESS)
00308       return ret;
00309 
00310     break;
00311 
00312   case GEARMAN_COMMAND_GET_STATUS:
00313     /* This may not be NULL terminated, so copy to make sure it is. */
00314     snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00315              (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00316 
00317     server_job= gearman_server_job_get(server_con->thread->server, job_handle, NULL);
00318 
00319     /* Queue status result packet. */
00320     if (server_job == NULL)
00321     {
00322       ret= gearman_server_io_packet_add(server_con, false,
00323                                         GEARMAN_MAGIC_RESPONSE,
00324                                         GEARMAN_COMMAND_STATUS_RES, job_handle,
00325                                         (size_t)(strlen(job_handle) + 1),
00326                                         "0", (size_t)2, "0", (size_t)2, "0",
00327                                         (size_t)2, "0", (size_t)1, NULL);
00328     }
00329     else
00330     {
00331       snprintf(numerator_buffer, 11, "%u", server_job->numerator);
00332       snprintf(denominator_buffer, 11, "%u", server_job->denominator);
00333 
00334       ret= gearman_server_io_packet_add(server_con, false,
00335                                         GEARMAN_MAGIC_RESPONSE,
00336                                         GEARMAN_COMMAND_STATUS_RES, job_handle,
00337                                         (size_t)(strlen(job_handle) + 1),
00338                                         "1", (size_t)2,
00339                                         server_job->worker == NULL ? "0" : "1",
00340                                         (size_t)2, numerator_buffer,
00341                                         (size_t)(strlen(numerator_buffer) + 1),
00342                                         denominator_buffer,
00343                                         (size_t)strlen(denominator_buffer),
00344                                         NULL);
00345     }
00346 
00347     if (ret != GEARMAN_SUCCESS)
00348       return ret;
00349 
00350     break;
00351 
00352   case GEARMAN_COMMAND_OPTION_REQ:
00353     /* This may not be NULL terminated, so copy to make sure it is. */
00354     snprintf(option, GEARMAN_OPTION_SIZE, "%.*s",
00355              (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00356 
00357     if (!strcasecmp(option, "exceptions"))
00358     {
00359       server_con->is_exceptions= true;
00360     }
00361     else
00362     {
00363       return _server_error_packet(server_con, "unknown_option",
00364                                   "Server does not recognize given option");
00365     }
00366 
00367     ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00368                                       GEARMAN_COMMAND_OPTION_RES,
00369                                       packet->arg[0], packet->arg_size[0],
00370                                       NULL);
00371     if (ret != GEARMAN_SUCCESS)
00372       return ret;
00373 
00374     break;
00375 
00376   /* Worker requests. */
00377   case GEARMAN_COMMAND_CAN_DO:
00378     if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00379                                   packet->arg_size[0], 0) == NULL)
00380     {
00381       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00382     }
00383 
00384     break;
00385 
00386   case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
00387     if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00388                                   packet->arg_size[0] - 1,
00389                                   (in_port_t)atoi((char *)(packet->arg[1])))
00390          == NULL)
00391     {
00392       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00393     }
00394 
00395     break;
00396 
00397   case GEARMAN_COMMAND_CANT_DO:
00398     gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
00399                                    packet->arg_size[0]);
00400     break;
00401 
00402   case GEARMAN_COMMAND_RESET_ABILITIES:
00403     gearman_server_con_free_workers(server_con);
00404     break;
00405 
00406   case GEARMAN_COMMAND_PRE_SLEEP:
00407     server_job= gearman_server_job_peek(server_con);
00408     if (server_job == NULL)
00409     {
00410       server_con->is_sleeping= true;
00411     }
00412     else
00413     {
00414       /* If there are jobs that could be run, queue a NOOP packet to wake the
00415          worker up. This could be the result of a race codition. */
00416       ret= gearman_server_io_packet_add(server_con, false,
00417                                         GEARMAN_MAGIC_RESPONSE,
00418                                         GEARMAN_COMMAND_NOOP, NULL);
00419       if (ret != GEARMAN_SUCCESS)
00420         return ret;
00421     }
00422 
00423     break;
00424 
00425   case GEARMAN_COMMAND_GRAB_JOB:
00426   case GEARMAN_COMMAND_GRAB_JOB_UNIQ:
00427     server_con->is_sleeping= false;
00428     server_con->is_noop_sent= false;
00429 
00430     server_job= gearman_server_job_take(server_con);
00431     if (server_job == NULL)
00432     {
00433       /* No jobs found, queue no job packet. */
00434       ret= gearman_server_io_packet_add(server_con, false,
00435                                         GEARMAN_MAGIC_RESPONSE,
00436                                         GEARMAN_COMMAND_NO_JOB, NULL);
00437     }
00438     else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
00439     {
00440       /* We found a runnable job, queue job assigned packet and take the job
00441          off the queue. */
00442       ret= gearman_server_io_packet_add(server_con, false,
00443                                    GEARMAN_MAGIC_RESPONSE,
00444                                    GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
00445                                    server_job->job_handle,
00446                                    (size_t)(strlen(server_job->job_handle) + 1),
00447                                    server_job->function->function_name,
00448                                    server_job->function->function_name_size + 1,
00449                                    server_job->unique,
00450                                    (size_t)(strlen(server_job->unique) + 1),
00451                                    server_job->data, server_job->data_size,
00452                                    NULL);
00453     }
00454     else
00455     {
00456       /* Same, but without unique ID. */
00457       ret= gearman_server_io_packet_add(server_con, false,
00458                                    GEARMAN_MAGIC_RESPONSE,
00459                                    GEARMAN_COMMAND_JOB_ASSIGN,
00460                                    server_job->job_handle,
00461                                    (size_t)(strlen(server_job->job_handle) + 1),
00462                                    server_job->function->function_name,
00463                                    server_job->function->function_name_size + 1,
00464                                    server_job->data, server_job->data_size,
00465                                    NULL);
00466     }
00467 
00468     if (ret != GEARMAN_SUCCESS)
00469     {
00470       if (server_job != NULL)
00471         return gearman_server_job_queue(server_job);
00472       return ret;
00473     }
00474 
00475     break;
00476 
00477   case GEARMAN_COMMAND_WORK_DATA:
00478   case GEARMAN_COMMAND_WORK_WARNING:
00479     server_job= gearman_server_job_get(server_con->thread->server,
00480                                        (char *)(packet->arg[0]),
00481                                        server_con);
00482     if (server_job == NULL)
00483     {
00484       return _server_error_packet(server_con, "job_not_found",
00485                                   "Job given in work result not found");
00486     }
00487 
00488     /* Queue the data/warning packet for all clients. */
00489     ret= _server_queue_work_data(server_job, packet, packet->command);
00490     if (ret != GEARMAN_SUCCESS)
00491       return ret;
00492 
00493     break;
00494 
00495   case GEARMAN_COMMAND_WORK_STATUS:
00496     server_job= gearman_server_job_get(server_con->thread->server,
00497                                        (char *)(packet->arg[0]),
00498                                        server_con);
00499     if (server_job == NULL)
00500     {
00501       return _server_error_packet(server_con, "job_not_found",
00502                                   "Job given in work result not found");
00503     }
00504 
00505     /* Update job status. */
00506     server_job->numerator= (uint32_t)atoi((char *)(packet->arg[1]));
00507 
00508     /* This may not be NULL terminated, so copy to make sure it is. */
00509     snprintf(denominator_buffer, 11, "%.*s", (uint32_t)(packet->arg_size[2]),
00510              (char *)(packet->arg[2]));
00511     server_job->denominator= (uint32_t)atoi(denominator_buffer);
00512 
00513     /* Queue the status packet for all clients. */
00514     for (server_client= server_job->client_list; server_client;
00515          server_client= server_client->job_next)
00516     {
00517       ret= gearman_server_io_packet_add(server_client->con, false,
00518                                         GEARMAN_MAGIC_RESPONSE,
00519                                         GEARMAN_COMMAND_WORK_STATUS,
00520                                         packet->arg[0], packet->arg_size[0],
00521                                         packet->arg[1], packet->arg_size[1],
00522                                         packet->arg[2], packet->arg_size[2],
00523                                         NULL);
00524       if (ret != GEARMAN_SUCCESS)
00525         return ret;
00526     }
00527 
00528     break;
00529 
00530   case GEARMAN_COMMAND_WORK_COMPLETE:
00531     server_job= gearman_server_job_get(server_con->thread->server,
00532                                        (char *)(packet->arg[0]),
00533                                        server_con);
00534     if (server_job == NULL)
00535     {
00536       return _server_error_packet(server_con, "job_not_found",
00537                                   "Job given in work result not found");
00538     }
00539 
00540     /* Queue the complete packet for all clients. */
00541     ret= _server_queue_work_data(server_job, packet,
00542                                  GEARMAN_COMMAND_WORK_COMPLETE);
00543     if (ret != GEARMAN_SUCCESS)
00544       return ret;
00545 
00546     /* Remove from persistent queue if one exists. */
00547     if (server_job->job_queued && server->queue_done_fn != NULL)
00548     {
00549       ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00550                                       server_job->unique,
00551                                       (size_t)strlen(server_job->unique),
00552                                       server_job->function->function_name,
00553                                       server_job->function->function_name_size);
00554       if (ret != GEARMAN_SUCCESS)
00555         return ret;
00556     }
00557 
00558     /* Job is done, remove it. */
00559     gearman_server_job_free(server_job);
00560     break;
00561 
00562   case GEARMAN_COMMAND_WORK_EXCEPTION:
00563     server_job= gearman_server_job_get(server_con->thread->server,
00564                                        (char *)(packet->arg[0]),
00565                                        server_con);
00566     if (server_job == NULL)
00567     {
00568       return _server_error_packet(server_con, "job_not_found",
00569                                   "Job given in work result not found");
00570     }
00571 
00572     /* Queue the exception packet for all clients. */
00573     ret= _server_queue_work_data(server_job, packet,
00574                                  GEARMAN_COMMAND_WORK_EXCEPTION);
00575     if (ret != GEARMAN_SUCCESS)
00576       return ret;
00577     break;
00578 
00579   case GEARMAN_COMMAND_WORK_FAIL:
00580     /* This may not be NULL terminated, so copy to make sure it is. */
00581     snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00582              (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00583 
00584     server_job= gearman_server_job_get(server_con->thread->server, job_handle,
00585                                        server_con);
00586     if (server_job == NULL)
00587     {
00588       return _server_error_packet(server_con, "job_not_found",
00589                                   "Job given in work result not found");
00590     }
00591 
00592     /* Queue the fail packet for all clients. */
00593     for (server_client= server_job->client_list; server_client;
00594          server_client= server_client->job_next)
00595     {
00596       ret= gearman_server_io_packet_add(server_client->con, false,
00597                                         GEARMAN_MAGIC_RESPONSE,
00598                                         GEARMAN_COMMAND_WORK_FAIL,
00599                                         packet->arg[0], packet->arg_size[0],
00600                                         NULL);
00601       if (ret != GEARMAN_SUCCESS)
00602         return ret;
00603     }
00604 
00605     /* Remove from persistent queue if one exists. */
00606     if (server_job->job_queued && server->queue_done_fn != NULL)
00607     {
00608       ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00609                                       server_job->unique,
00610                                       (size_t)strlen(server_job->unique),
00611                                       server_job->function->function_name,
00612                                       server_job->function->function_name_size);
00613       if (ret != GEARMAN_SUCCESS)
00614         return ret;
00615     }
00616 
00617     /* Job is done, remove it. */
00618     gearman_server_job_free(server_job);
00619     break;
00620 
00621   case GEARMAN_COMMAND_SET_CLIENT_ID:
00622     gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
00623                               packet->arg_size[0]);
00624     break;
00625 
00626   case GEARMAN_COMMAND_TEXT:
00627     return _server_run_text(server_con, packet);
00628 
00629   case GEARMAN_COMMAND_UNUSED:
00630   case GEARMAN_COMMAND_NOOP:
00631   case GEARMAN_COMMAND_JOB_CREATED:
00632   case GEARMAN_COMMAND_NO_JOB:
00633   case GEARMAN_COMMAND_JOB_ASSIGN:
00634   case GEARMAN_COMMAND_ECHO_RES:
00635   case GEARMAN_COMMAND_ERROR:
00636   case GEARMAN_COMMAND_STATUS_RES:
00637   case GEARMAN_COMMAND_ALL_YOURS:
00638   case GEARMAN_COMMAND_OPTION_RES:
00639   case GEARMAN_COMMAND_SUBMIT_JOB_SCHED:
00640   case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
00641   case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
00642   case GEARMAN_COMMAND_MAX:
00643   default:
00644     return _server_error_packet(server_con, "bad_command",
00645                                 "Command not expected");
00646   }
00647 
00648   return GEARMAN_SUCCESS;
00649 }
00650 
00651 gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
00652 {
00653   server->shutdown_graceful= true;
00654 
00655   if (server->job_count == 0)
00656     return GEARMAN_SHUTDOWN;
00657 
00658   return GEARMAN_SHUTDOWN_GRACEFUL;
00659 }
00660 
00661 gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
00662 {
00663   gearman_return_t ret;
00664 
00665   if (server->queue_replay_fn == NULL)
00666     return GEARMAN_SUCCESS;
00667 
00668   server->state.queue_startup= true;
00669 
00670   ret= (*(server->queue_replay_fn))(server, (void *)server->queue_context,
00671                                     _queue_replay_add, server);
00672 
00673   server->state.queue_startup= false;
00674 
00675   return ret;
00676 }
00677 
00678 void *gearman_server_queue_context(const gearman_server_st *server)
00679 {
00680   return (void *)server->queue_context;
00681 }
00682 
00683 void gearman_server_set_queue_context(gearman_server_st *server,
00684                                       void *context)
00685 {
00686   server->queue_context= context;
00687 }
00688 
00689 void gearman_server_set_queue_add_fn(gearman_server_st *server,
00690                                      gearman_queue_add_fn *function)
00691 {
00692   server->queue_add_fn= function;
00693 }
00694 
00695 void gearman_server_set_queue_flush_fn(gearman_server_st *server,
00696                                        gearman_queue_flush_fn *function)
00697 {
00698   server->queue_flush_fn= function;
00699 }
00700 
00701 void gearman_server_set_queue_done_fn(gearman_server_st *server,
00702                                       gearman_queue_done_fn *function)
00703 {
00704   server->queue_done_fn= function;
00705 }
00706 
00707 void gearman_server_set_queue_replay_fn(gearman_server_st *server,
00708                                         gearman_queue_replay_fn *function)
00709 {
00710   server->queue_replay_fn= function;
00711 }
00712 
00713 /*
00714  * Private definitions
00715  */
00716 
00717 gearman_return_t _queue_replay_add(gearman_server_st *server,
00718                                    void *context __attribute__ ((unused)),
00719                                    const void *unique, size_t unique_size,
00720                                    const void *function_name,
00721                                    size_t function_name_size, const void *data,
00722                                    size_t data_size,
00723                                    gearman_job_priority_t priority)
00724 {
00725   gearman_return_t ret;
00726 
00727   (void)gearman_server_job_add(server, (char *)function_name,
00728                                function_name_size, (char *)unique, unique_size,
00729                                data, data_size, priority, NULL, &ret);
00730   return ret;
00731 }
00732 
00733 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00734                                              const char *error_code,
00735                                              const char *error_string)
00736 {
00737   return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00738                                       GEARMAN_COMMAND_ERROR, error_code,
00739                                       (size_t)(strlen(error_code) + 1),
00740                                       error_string,
00741                                       (size_t)strlen(error_string), NULL);
00742 }
00743 
00744 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00745                                          gearman_packet_st *packet)
00746 {
00747   char *data;
00748   char *new_data;
00749   size_t size;
00750   size_t total;
00751   int max_queue_size;
00752   gearman_server_thread_st *thread;
00753   gearman_server_con_st *con;
00754   gearman_server_worker_st *worker;
00755   gearman_server_function_st *function;
00756   gearman_server_packet_st *server_packet;
00757 
00758   data= (char *)malloc(GEARMAN_TEXT_RESPONSE_SIZE);
00759   if (data == NULL)
00760   {
00761     gearman_log_error(packet->universal, "_server_run_text", "malloc");
00762     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00763   }
00764   total= GEARMAN_TEXT_RESPONSE_SIZE;
00765 
00766   if (packet->argc == 0)
00767   {
00768     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00769              "ERR unknown_command Unknown+server+command\n");
00770   }
00771   else if (!strcasecmp("workers", (char *)(packet->arg[0])))
00772   {
00773     size= 0;
00774 
00775     for (thread= server_con->thread->server->thread_list; thread != NULL;
00776          thread= thread->next)
00777     {
00778       (void) pthread_mutex_lock(&thread->lock);
00779 
00780       for (con= thread->con_list; con != NULL; con= con->next)
00781       {
00782         if (con->host == NULL)
00783           continue;
00784 
00785         if (size > total)
00786           size= total;
00787 
00788         /* Make sure we have at least GEARMAN_TEXT_RESPONSE_SIZE bytes. */
00789         if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00790         {
00791           new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00792           if (new_data == NULL)
00793           {
00794             (void) pthread_mutex_unlock(&thread->lock);
00795             free(data);
00796             gearman_log_error(packet->universal, "_server_run_text", "malloc");
00797             return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00798           }
00799 
00800           data= new_data;
00801           total+= GEARMAN_TEXT_RESPONSE_SIZE;
00802         }
00803 
00804         size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
00805                                 con->con.fd, con->host, con->id);
00806         if (size > total)
00807           continue;
00808 
00809         for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
00810         {
00811           size+= (size_t)snprintf(data + size, total - size, " %.*s",
00812                                   (int)(worker->function->function_name_size),
00813                                   worker->function->function_name);
00814           if (size > total)
00815             break;
00816         }
00817 
00818         if (size > total)
00819           continue;
00820 
00821         size+= (size_t)snprintf(data + size, total - size, "\n");
00822       }
00823 
00824       (void) pthread_mutex_unlock(&thread->lock);
00825     }
00826 
00827     if (size < total)
00828       snprintf(data + size, total - size, ".\n");
00829   }
00830   else if (!strcasecmp("status", (char *)(packet->arg[0])))
00831   {
00832     size= 0;
00833 
00834     for (function= server_con->thread->server->function_list; function != NULL;
00835          function= function->next)
00836     {
00837       if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00838       {
00839         new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00840         if (new_data == NULL)
00841         {
00842           free(data);
00843           gearman_log_error(packet->universal, "_server_run_text", "malloc");
00844           return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00845         }
00846 
00847         data= new_data;
00848         total+= GEARMAN_TEXT_RESPONSE_SIZE;
00849       }
00850 
00851       size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
00852                               (int)(function->function_name_size),
00853                               function->function_name, function->job_total,
00854                               function->job_running, function->worker_count);
00855       if (size > total)
00856         size= total;
00857     }
00858 
00859     if (size < total)
00860       snprintf(data + size, total - size, ".\n");
00861   }
00862   else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
00863   {
00864     if (packet->argc == 1)
00865     {
00866       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
00867                "An+incomplete+set+of+arguments+was+sent+to+this+command\n");
00868     }
00869     else
00870     {
00871       if (packet->argc == 2)
00872         max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
00873       else
00874       {
00875         max_queue_size= atoi((char *)(packet->arg[2]));
00876         if (max_queue_size < 0)
00877           max_queue_size= 0;
00878       }
00879 
00880       for (function= server_con->thread->server->function_list;
00881            function != NULL; function= function->next)
00882       {
00883         if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
00884             !memcmp(packet->arg[1], function->function_name,
00885                     function->function_name_size))
00886         {
00887           function->max_queue_size= (uint32_t)max_queue_size;
00888         }
00889       }
00890 
00891       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00892     }
00893   }
00894   else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
00895   {
00896     if (packet->argc == 1)
00897     {
00898       server_con->thread->server->shutdown= true;
00899       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00900     }
00901     else if (packet->argc == 2 &&
00902              !strcasecmp("graceful", (char *)(packet->arg[1])))
00903     {
00904       server_con->thread->server->shutdown_graceful= true;
00905       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00906     }
00907     else
00908     {
00909       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00910                "ERR unknown_args Unknown+arguments+to+server+command\n");
00911     }
00912   }
00913   else if (!strcasecmp("version", (char *)(packet->arg[0])))
00914     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
00915   else
00916   {
00917     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00918              "ERR unknown_command Unknown+server+command\n");
00919   }
00920 
00921   server_packet= gearman_server_packet_create(server_con->thread, false);
00922   if (server_packet == NULL)
00923   {
00924     free(data);
00925     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00926   }
00927 
00928   if (gearman_packet_create(server_con->thread->gearman,
00929                             &(server_packet->packet)) == NULL)
00930   {
00931     free(data);
00932     gearman_server_packet_free(server_packet, server_con->thread, false);
00933     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00934   }
00935 
00936   server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
00937   server_packet->packet.command= GEARMAN_COMMAND_TEXT;
00938   server_packet->packet.options.complete= true;
00939   server_packet->packet.options.free_data= true;
00940 
00941   server_packet->packet.data= data;
00942   server_packet->packet.data_size= strlen(data);
00943 
00944   (void) pthread_mutex_lock(&server_con->thread->lock);
00945   GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
00946   (void) pthread_mutex_unlock(&server_con->thread->lock);
00947 
00948   gearman_server_con_io_add(server_con);
00949 
00950   return GEARMAN_SUCCESS;
00951 }
00952 
00953 static gearman_return_t
00954 _server_queue_work_data(gearman_server_job_st *server_job,
00955                         gearman_packet_st *packet, gearman_command_t command)
00956 {
00957   gearman_server_client_st *server_client;
00958   uint8_t *data;
00959   gearman_return_t ret;
00960 
00961   for (server_client= server_job->client_list; server_client;
00962        server_client= server_client->job_next)
00963   {
00964     if (command == GEARMAN_COMMAND_WORK_EXCEPTION && !(server_client->con->is_exceptions))
00965     {
00966       continue;
00967     }
00968 
00969     if (packet->data_size > 0)
00970     {
00971       if (packet->options.free_data &&
00972           server_client->job_next == NULL)
00973       {
00974         data= (uint8_t *)(packet->data);
00975         packet->options.free_data= false;
00976       }
00977       else
00978       {
00979         data= (uint8_t *)malloc(packet->data_size);
00980         if (data == NULL)
00981         {
00982           gearman_log_error(packet->universal, "_server_run_command", "malloc");
00983           return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00984         }
00985 
00986         memcpy(data, packet->data, packet->data_size);
00987       }
00988     }
00989     else
00990       data= NULL;
00991 
00992     ret= gearman_server_io_packet_add(server_client->con, true,
00993                                       GEARMAN_MAGIC_RESPONSE, command,
00994                                       packet->arg[0], packet->arg_size[0],
00995                                       data, packet->data_size, NULL);
00996     if (ret != GEARMAN_SUCCESS)
00997       return ret;
00998   }
00999 
01000   return GEARMAN_SUCCESS;
01001 }
01002 
01003 static void _log(const char *line, gearman_verbose_t verbose, void *context)
01004 {
01005   gearman_server_st *server= (gearman_server_st *)context;
01006   (*(server->log_fn))(line, verbose, (void *)server->log_context);
01007 }