Gearman Developer Documentation

libgearman-server/gearmand.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 #include "gearmand.h"
00016 
00017 /*
00018  * Private declarations
00019  */
00020 
00027 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00028 
00029 static gearman_return_t _listen_init(gearmand_st *gearmand);
00030 static void _listen_close(gearmand_st *gearmand);
00031 static gearman_return_t _listen_watch(gearmand_st *gearmand);
00032 static void _listen_clear(gearmand_st *gearmand);
00033 static void _listen_event(int fd, short events, void *arg);
00034 
00035 static gearman_return_t _wakeup_init(gearmand_st *gearmand);
00036 static void _wakeup_close(gearmand_st *gearmand);
00037 static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
00038 static void _wakeup_clear(gearmand_st *gearmand);
00039 static void _wakeup_event(int fd, short events, void *arg);
00040 
00041 static gearman_return_t _watch_events(gearmand_st *gearmand);
00042 static void _clear_events(gearmand_st *gearmand);
00043 static void _close_events(gearmand_st *gearmand);
00044 
00047 /*
00048  * Public definitions
00049  */
00050 
00051 gearmand_st *gearmand_create(const char *host, in_port_t port)
00052 {
00053   gearmand_st *gearmand;
00054 
00055   gearmand= (gearmand_st *)malloc(sizeof(gearmand_st));
00056   if (gearmand == NULL)
00057     return NULL;
00058 
00059   if (gearman_server_create(&(gearmand->server)) == NULL)
00060   {
00061     free(gearmand);
00062     return NULL;
00063   }
00064 
00065   gearmand->is_listen_event= false;
00066   gearmand->is_wakeup_event= false;
00067   gearmand->verbose= 0;
00068   gearmand->ret= 0;
00069   gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
00070   gearmand->threads= 0;
00071   gearmand->port_count= 0;
00072   gearmand->thread_count= 0;
00073   gearmand->free_dcon_count= 0;
00074   gearmand->max_thread_free_dcon_count= 0;
00075   gearmand->wakeup_fd[0]= -1;
00076   gearmand->wakeup_fd[1]= -1;
00077   gearmand->host= host;
00078   gearmand->log_fn= NULL;
00079   gearmand->log_context= NULL;
00080   gearmand->base= NULL;
00081   gearmand->port_list= NULL;
00082   gearmand->thread_list= NULL;
00083   gearmand->thread_add_next= NULL;
00084   gearmand->free_dcon_list= NULL;
00085 
00086   if (port == 0)
00087     port= GEARMAN_DEFAULT_TCP_PORT;
00088 
00089   if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
00090   {
00091     gearmand_free(gearmand);
00092     return NULL;
00093   }
00094 
00095   return gearmand;
00096 }
00097 
00098 void gearmand_free(gearmand_st *gearmand)
00099 {
00100   gearmand_con_st *dcon;
00101   uint32_t x;
00102 
00103   _close_events(gearmand);
00104 
00105   if (gearmand->threads > 0)
00106     gearmand_log_info(gearmand, "Shutting down all threads");
00107 
00108   while (gearmand->thread_list != NULL)
00109     gearmand_thread_free(gearmand->thread_list);
00110 
00111   while (gearmand->free_dcon_list != NULL)
00112   {
00113     dcon= gearmand->free_dcon_list;
00114     gearmand->free_dcon_list= dcon->next;
00115     free(dcon);
00116   }
00117 
00118   if (gearmand->base != NULL)
00119     event_base_free(gearmand->base);
00120 
00121   gearman_server_free(&(gearmand->server));
00122 
00123   for (x= 0; x < gearmand->port_count; x++)
00124   {
00125     if (gearmand->port_list[x].listen_fd != NULL)
00126       free(gearmand->port_list[x].listen_fd);
00127 
00128     if (gearmand->port_list[x].listen_event != NULL)
00129       free(gearmand->port_list[x].listen_event);
00130   }
00131 
00132   if (gearmand->port_list != NULL)
00133     free(gearmand->port_list);
00134 
00135   gearmand_log_info(gearmand, "Shutdown complete");
00136 
00137   free(gearmand);
00138 }
00139 
00140 void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
00141 {
00142   gearmand->backlog= backlog;
00143 }
00144 
00145 void gearmand_set_job_retries(gearmand_st *gearmand, uint8_t job_retries)
00146 {
00147   gearman_server_set_job_retries(&(gearmand->server), job_retries);
00148 }
00149 
00150 void gearmand_set_worker_wakeup(gearmand_st *gearmand, uint8_t worker_wakeup)
00151 {
00152   gearman_server_set_worker_wakeup(&(gearmand->server), worker_wakeup);
00153 }
00154 
00155 void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
00156 {
00157   gearmand->threads= threads;
00158 }
00159 
00160 void gearmand_set_log_fn(gearmand_st *gearmand, gearman_log_fn *function,
00161                          void *context, gearman_verbose_t verbose)
00162 {
00163   gearman_server_set_log_fn(&(gearmand->server), _log, gearmand, verbose);
00164   gearmand->log_fn= function;
00165   gearmand->log_context= context;
00166   gearmand->verbose= verbose;
00167 }
00168 
00169 gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
00170                                    gearman_connection_add_fn *function)
00171 {
00172   gearmand_port_st *port_list;
00173 
00174   port_list= (gearmand_port_st *)realloc(gearmand->port_list,
00175                                          sizeof(gearmand_port_st) * (gearmand->port_count + 1));
00176   if (port_list == NULL)
00177   {
00178     gearmand_log_fatal(gearmand, "gearmand_port_add:realloc:NULL");
00179     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00180   }
00181 
00182   port_list[gearmand->port_count].port= port;
00183   port_list[gearmand->port_count].listen_count= 0;
00184   port_list[gearmand->port_count].gearmand= gearmand;
00185   port_list[gearmand->port_count].add_fn= function;
00186   port_list[gearmand->port_count].listen_fd= NULL;
00187   port_list[gearmand->port_count].listen_event= NULL;
00188 
00189   gearmand->port_list= port_list;
00190   gearmand->port_count++;
00191 
00192   return GEARMAN_SUCCESS;
00193 }
00194 
00195 gearman_return_t gearmand_run(gearmand_st *gearmand)
00196 {
00197   uint32_t x;
00198 
00199   /* Initialize server components. */
00200   if (gearmand->base == NULL)
00201   {
00202     gearmand_log_info(gearmand, "Starting up");
00203 
00204     if (gearmand->threads > 0)
00205     {
00206 #ifndef HAVE_EVENT_BASE_NEW
00207       gearmand_log_fatal(gearmand, "Multi-threaded gearmand requires libevent 1.4 or later, libevent 1.3 does not provided a "
00208                          "thread-safe interface.");
00209       return GEARMAN_EVENT;
00210 #else
00211       /* Set the number of free connection structures each thread should keep
00212          around before the main thread is forced to take them. We compute this
00213          here so we don't need to on every new connection. */
00214       gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
00215                                               gearmand->threads) / 2);
00216 #endif
00217     }
00218 
00219     gearmand_log_debug(gearmand, "Initializing libevent for main thread");
00220 
00221     gearmand->base= event_base_new();
00222     if (gearmand->base == NULL)
00223     {
00224       gearmand_log_fatal(gearmand, "gearmand_run:event_base_new:NULL");
00225       return GEARMAN_EVENT;
00226     }
00227 
00228     gearmand_log_debug(gearmand, "Method for libevent: %s",
00229                        event_base_get_method(gearmand->base));
00230 
00231     gearmand->ret= _listen_init(gearmand);
00232     if (gearmand->ret != GEARMAN_SUCCESS)
00233       return gearmand->ret;
00234 
00235     gearmand->ret= _wakeup_init(gearmand);
00236     if (gearmand->ret != GEARMAN_SUCCESS)
00237       return gearmand->ret;
00238 
00239     gearmand_log_debug(gearmand, "Creating %u threads", gearmand->threads);
00240 
00241     /* If we have 0 threads we still need to create a fake one for context. */
00242     x= 0;
00243     do
00244     {
00245       gearmand->ret= gearmand_thread_create(gearmand);
00246       if (gearmand->ret != GEARMAN_SUCCESS)
00247         return gearmand->ret;
00248       x++;
00249     }
00250     while (x < gearmand->threads);
00251 
00252     gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
00253     if (gearmand->ret != GEARMAN_SUCCESS)
00254       return gearmand->ret;
00255   }
00256 
00257   gearmand->ret= _watch_events(gearmand);
00258   if (gearmand->ret != GEARMAN_SUCCESS)
00259     return gearmand->ret;
00260 
00261   gearmand_log_info(gearmand, "Entering main event loop");
00262 
00263   if (event_base_loop(gearmand->base, 0) == -1)
00264   {
00265     gearmand_log_fatal(gearmand, "gearmand_run:event_base_loop:-1");
00266     return GEARMAN_EVENT;
00267   }
00268 
00269   gearmand_log_info(gearmand, "Exited main event loop");
00270 
00271   return gearmand->ret;
00272 }
00273 
00274 void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
00275 {
00276   uint8_t buffer= wakeup;
00277 
00278   /* If this fails, there is not much we can really do. This should never fail
00279      though if the main gearmand thread is still active. */
00280   if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
00281     gearmand_log_error(gearmand, "gearmand_wakeup:write:%d", errno);
00282 }
00283 
00284 void gearmand_set_round_robin(gearmand_st *gearmand, bool round_robin)
00285 {
00286   gearmand->server.flags.round_robin= round_robin;
00287 }
00288 
00289 
00290 /*
00291  * Private definitions
00292  */
00293 
00294 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00295 {
00296   gearmand_st *gearmand= (gearmand_st *)context;
00297   (*gearmand->log_fn)(line, verbose, (void *)gearmand->log_context);
00298 }
00299 
00300 static gearman_return_t _listen_init(gearmand_st *gearmand)
00301 {
00302   for (uint32_t x= 0; x < gearmand->port_count; x++)
00303   {
00304     int ret;
00305     struct gearmand_port_st *port;
00306     char port_str[NI_MAXSERV];
00307     struct addrinfo ai;
00308     struct addrinfo *addrinfo;
00309 
00310     port= &gearmand->port_list[x];
00311 
00312     snprintf(port_str, NI_MAXSERV, "%u", (uint32_t)(port->port));
00313 
00314     memset(&ai, 0, sizeof(struct addrinfo));
00315     ai.ai_flags  = AI_PASSIVE;
00316     ai.ai_family = AF_UNSPEC;
00317     ai.ai_socktype = SOCK_STREAM;
00318     ai.ai_protocol= IPPROTO_TCP;
00319 
00320     ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
00321     if (ret != 0)
00322     {
00323       gearmand_log_fatal(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret));
00324       return GEARMAN_ERRNO;
00325     }
00326 
00327     for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
00328          addrinfo_next= addrinfo_next->ai_next)
00329     {
00330       int opt;
00331       int fd;
00332       char host[NI_MAXHOST];
00333 
00334       ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
00335                        NI_MAXHOST, port_str, NI_MAXSERV,
00336                        NI_NUMERICHOST | NI_NUMERICSERV);
00337       if (ret != 0)
00338       {
00339         gearmand_log_error(gearmand, "_listen_init:getnameinfo:%s", gai_strerror(ret));
00340         strcpy(host, "-");
00341         strcpy(port_str, "-");
00342       }
00343 
00344       gearmand_log_debug(gearmand, "Trying to listen on %s:%s", host, port_str);
00345 
00346       /* Call to socket() can fail for some getaddrinfo results, try another. */
00347       fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
00348                  addrinfo_next->ai_protocol);
00349       if (fd == -1)
00350       {
00351         gearmand_log_error(gearmand, "Failed to listen on %s:%s", host, port_str);
00352         continue;
00353       }
00354 
00355       opt= 1;
00356       ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
00357       if (ret == -1)
00358       {
00359         close(fd);
00360         gearmand_log_fatal(gearmand, "_listen_init:setsockopt:%d", errno);
00361         return GEARMAN_ERRNO;
00362       }
00363 
00364       ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
00365       if (ret == -1)
00366       {
00367         close(fd);
00368         if (errno == EADDRINUSE)
00369         {
00370           if (port->listen_fd == NULL)
00371           {
00372             gearmand_log_error(gearmand, "Address already in use %s:%s", host, port_str);
00373           }
00374 
00375           continue;
00376         }
00377 
00378         gearmand_log_fatal(gearmand, "_listen_init:bind:%d", errno);
00379         return GEARMAN_ERRNO;
00380       }
00381 
00382       if (listen(fd, gearmand->backlog) == -1)
00383       {
00384         close(fd);
00385         gearmand_log_fatal(gearmand, "_listen_init:listen:%d", errno);
00386         return GEARMAN_ERRNO;
00387       }
00388 
00389       // Scoping note for eventual transformation
00390       {
00391         int *fd_list;
00392 
00393         fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
00394         if (fd_list == NULL)
00395         {
00396           close(fd);
00397           gearmand_log_fatal(gearmand, "_listen_init:realloc:%d", errno);
00398             return GEARMAN_ERRNO;
00399         }
00400 
00401         port->listen_fd= fd_list;
00402       }
00403 
00404       port->listen_fd[port->listen_count]= fd;
00405       port->listen_count++;
00406 
00407       gearmand_log_info(gearmand, "Listening on %s:%s (%d)", host, port_str, fd);
00408     }
00409 
00410     freeaddrinfo(addrinfo);
00411 
00412     /* Report last socket() error if we couldn't find an address to bind. */
00413     if (port->listen_fd == NULL)
00414     {
00415       gearmand_log_fatal(gearmand, "_listen_init:Could not bind/listen to any addresses");
00416       return GEARMAN_ERRNO;
00417     }
00418 
00419     port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count);
00420     if (port->listen_event == NULL)
00421     {
00422       gearmand_log_fatal(gearmand, "_listen_init:malloc:%d", errno);
00423       return GEARMAN_ERRNO;
00424     }
00425 
00426     for (uint32_t y= 0; y < port->listen_count; y++)
00427     {
00428       event_set(&(port->listen_event[y]), port->listen_fd[y],
00429                 EV_READ | EV_PERSIST, _listen_event, port);
00430       event_base_set(gearmand->base, &(port->listen_event[y]));
00431     }
00432   }
00433 
00434   return GEARMAN_SUCCESS;
00435 }
00436 
00437 static void _listen_close(gearmand_st *gearmand)
00438 {
00439   _listen_clear(gearmand);
00440 
00441   for (uint32_t x= 0; x < gearmand->port_count; x++)
00442   {
00443     for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00444     {
00445       if (gearmand->port_list[x].listen_fd[y] >= 0)
00446       {
00447         gearmand_log_info(gearmand, "Closing listening socket (%d)", gearmand->port_list[x].listen_fd[y]);
00448         close(gearmand->port_list[x].listen_fd[y]);
00449         gearmand->port_list[x].listen_fd[y]= -1;
00450       }
00451     }
00452   }
00453 }
00454 
00455 static gearman_return_t _listen_watch(gearmand_st *gearmand)
00456 {
00457   if (gearmand->is_listen_event)
00458     return GEARMAN_SUCCESS;
00459 
00460   for (uint32_t x= 0; x < gearmand->port_count; x++)
00461   {
00462     for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00463     {
00464       gearmand_log_info(gearmand, "Adding event for listening socket (%d)",
00465                         gearmand->port_list[x].listen_fd[y]);
00466 
00467         if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
00468       {
00469         gearmand_log_fatal(gearmand, "_listen_watch:event_add:-1");
00470         return GEARMAN_EVENT;
00471       }
00472     }
00473   }
00474 
00475   gearmand->is_listen_event= true;
00476   return GEARMAN_SUCCESS;
00477 }
00478 
00479 static void _listen_clear(gearmand_st *gearmand)
00480 {
00481   if (! (gearmand->is_listen_event))
00482     return;
00483 
00484   int del_ret= 0;
00485   for (uint32_t x= 0; x < gearmand->port_count; x++)
00486   {
00487     for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00488     {
00489       gearmand_log_info(gearmand, "Clearing event for listening socket (%d)",
00490                         gearmand->port_list[x].listen_fd[y]);
00491         del_ret= event_del(&(gearmand->port_list[x].listen_event[y]));
00492       assert(del_ret == 0);
00493     }
00494   }
00495 
00496   gearmand->is_listen_event= false;
00497 }
00498 
00499 static void _listen_event(int fd, short events __attribute__ ((unused)),
00500                           void *arg)
00501 {
00502   gearmand_port_st *port= (gearmand_port_st *)arg;
00503   struct sockaddr_storage sa;
00504   socklen_t sa_len;
00505   char host[NI_MAXHOST];
00506   char port_str[NI_MAXSERV];
00507   int ret;
00508 
00509   sa_len= sizeof(sa);
00510   fd= accept(fd, (struct sockaddr *)&sa, &sa_len);
00511   if (fd == -1)
00512   {
00513     if (errno == EINTR)
00514       return;
00515     else if (errno == EMFILE)
00516     {
00517       gearmand_log_error(port->gearmand, "_listen_event:accept:too many open files");
00518       return;
00519     }
00520 
00521     _clear_events(port->gearmand);
00522     gearmand_log_fatal(port->gearmand, "_listen_event:accept:%d", errno);
00523     port->gearmand->ret= GEARMAN_ERRNO;
00524     return;
00525   }
00526 
00527   /* Since this is numeric, it should never fail. Even if it did we don't want
00528      to really error from it. */
00529   ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
00530                    NI_NUMERICHOST | NI_NUMERICSERV);
00531   if (ret != 0)
00532   {
00533     gearmand_log_error(port->gearmand, "_listen_event:getnameinfo:%s", gai_strerror(ret));
00534     strcpy(host, "-");
00535     strcpy(port_str, "-");
00536   }
00537 
00538   gearmand_log_info(port->gearmand, "Accepted connection from %s:%s", host, port_str);
00539 
00540   port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
00541                                            port->add_fn);
00542   if (port->gearmand->ret != GEARMAN_SUCCESS)
00543     _clear_events(port->gearmand);
00544 }
00545 
00546 static gearman_return_t _wakeup_init(gearmand_st *gearmand)
00547 {
00548   int ret;
00549 
00550   gearmand_log_info(gearmand, "Creating wakeup pipe");
00551 
00552   ret= pipe(gearmand->wakeup_fd);
00553   if (ret == -1)
00554   {
00555     gearmand_log_fatal(gearmand, "_wakeup_init:pipe:%d", errno);
00556     return GEARMAN_ERRNO;
00557   }
00558 
00559   ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
00560   if (ret == -1)
00561   {
00562     gearmand_log_fatal(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
00563     return GEARMAN_ERRNO;
00564   }
00565 
00566   ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00567   if (ret == -1)
00568   {
00569     gearmand_log_fatal(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
00570     return GEARMAN_ERRNO;
00571   }
00572 
00573   event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
00574             EV_READ | EV_PERSIST, _wakeup_event, gearmand);
00575   event_base_set(gearmand->base, &(gearmand->wakeup_event));
00576 
00577   return GEARMAN_SUCCESS;
00578 }
00579 
00580 static void _wakeup_close(gearmand_st *gearmand)
00581 {
00582   _wakeup_clear(gearmand);
00583 
00584   if (gearmand->wakeup_fd[0] >= 0)
00585   {
00586     gearmand_log_info(gearmand, "Closing wakeup pipe");
00587     close(gearmand->wakeup_fd[0]);
00588     gearmand->wakeup_fd[0]= -1;
00589     close(gearmand->wakeup_fd[1]);
00590     gearmand->wakeup_fd[1]= -1;
00591   }
00592 }
00593 
00594 static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
00595 {
00596   if (gearmand->is_wakeup_event)
00597     return GEARMAN_SUCCESS;
00598 
00599   gearmand_log_info(gearmand, "Adding event for wakeup pipe");
00600 
00601   if (event_add(&(gearmand->wakeup_event), NULL) == -1)
00602   {
00603     gearmand_log_fatal(gearmand, "_wakeup_watch:event_add:-1");
00604     return GEARMAN_EVENT;
00605   }
00606 
00607   gearmand->is_wakeup_event= true;
00608   return GEARMAN_SUCCESS;
00609 }
00610 
00611 static void _wakeup_clear(gearmand_st *gearmand)
00612 {
00613   if (gearmand->is_wakeup_event)
00614   {
00615     gearmand_log_info(gearmand, "Clearing event for wakeup pipe");
00616     int del_ret= event_del(&(gearmand->wakeup_event));
00617     assert(del_ret == 0);
00618     gearmand->is_wakeup_event= false;
00619   }
00620 }
00621 
00622 static void _wakeup_event(int fd, short events __attribute__ ((unused)),
00623                           void *arg)
00624 {
00625   gearmand_st *gearmand= (gearmand_st *)arg;
00626   uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00627   ssize_t ret;
00628   gearmand_thread_st *thread;
00629 
00630   while (1)
00631   {
00632     ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00633     if (ret == 0)
00634     {
00635       _clear_events(gearmand);
00636       gearmand_log_fatal(gearmand, "_wakeup_event:read:EOF");
00637       gearmand->ret= GEARMAN_PIPE_EOF;
00638       return;
00639     }
00640     else if (ret == -1)
00641     {
00642       if (errno == EINTR)
00643         continue;
00644 
00645       if (errno == EAGAIN)
00646         break;
00647 
00648       _clear_events(gearmand);
00649       gearmand_log_fatal(gearmand, "_wakeup_event:read:%d", errno);
00650       gearmand->ret= GEARMAN_ERRNO;
00651       return;
00652     }
00653 
00654     for (ssize_t x= 0; x < ret; x++)
00655     {
00656       switch ((gearmand_wakeup_t)buffer[x])
00657       {
00658       case GEARMAND_WAKEUP_PAUSE:
00659         gearmand_log_info(gearmand, "Received PAUSE wakeup event");
00660         _clear_events(gearmand);
00661         gearmand->ret= GEARMAN_PAUSE;
00662         break;
00663 
00664       case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00665         gearmand_log_info(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event");
00666         _listen_close(gearmand);
00667 
00668         for (thread= gearmand->thread_list; thread != NULL;
00669              thread= thread->next)
00670         {
00671           gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
00672         }
00673 
00674         gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
00675         break;
00676 
00677       case GEARMAND_WAKEUP_SHUTDOWN:
00678         gearmand_log_info(gearmand, "Received SHUTDOWN wakeup event");
00679         _clear_events(gearmand);
00680         gearmand->ret= GEARMAN_SHUTDOWN;
00681         break;
00682 
00683       case GEARMAND_WAKEUP_CON:
00684       case GEARMAND_WAKEUP_RUN:
00685       default:
00686         gearmand_log_fatal(gearmand, "Received unknown wakeup event (%u)", buffer[x]);
00687         _clear_events(gearmand);
00688         gearmand->ret= GEARMAN_UNKNOWN_STATE;
00689         break;
00690       }
00691     }
00692   }
00693 }
00694 
00695 static gearman_return_t _watch_events(gearmand_st *gearmand)
00696 {
00697   gearman_return_t ret;
00698 
00699   ret= _listen_watch(gearmand);
00700   if (ret != GEARMAN_SUCCESS)
00701     return ret;
00702 
00703   ret= _wakeup_watch(gearmand);
00704   if (ret != GEARMAN_SUCCESS)
00705     return ret;
00706 
00707   return GEARMAN_SUCCESS;
00708 }
00709 
00710 static void _clear_events(gearmand_st *gearmand)
00711 {
00712   _listen_clear(gearmand);
00713   _wakeup_clear(gearmand);
00714 
00715   /* If we are not threaded, tell the fake thread to shutdown now to clear
00716      connections. Otherwise we will never exit the libevent loop. */
00717   if (gearmand->threads == 0 && gearmand->thread_list != NULL)
00718     gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
00719 }
00720 
00721 static void _close_events(gearmand_st *gearmand)
00722 {
00723   _listen_close(gearmand);
00724   _wakeup_close(gearmand);
00725 }