Gearman Developer Documentation

libgearman-server/gearmand_thread.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 #include "gearmand.h"
00016 
00017 /*
00018  * Private declarations
00019  */
00020 
00027 static void *_thread(void *data);
00028 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00029 static void _run(gearman_server_thread_st *thread, void *fn_arg);
00030 
00031 static gearman_return_t _wakeup_init(gearmand_thread_st *thread);
00032 static void _wakeup_close(gearmand_thread_st *thread);
00033 static void _wakeup_clear(gearmand_thread_st *thread);
00034 static void _wakeup_event(int fd, short events, void *arg);
00035 static void _clear_events(gearmand_thread_st *thread);
00036 
00039 /*
00040  * Public definitions
00041  */
00042 
00043 gearman_return_t gearmand_thread_create(gearmand_st *gearmand)
00044 {
00045   gearmand_thread_st *thread;
00046   gearman_return_t ret;
00047   int pthread_ret;
00048 
00049   thread= (gearmand_thread_st *)malloc(sizeof(gearmand_thread_st));
00050   if (thread == NULL)
00051   {
00052     gearmand_log_fatal(gearmand, "gearmand_thread_create:malloc");
00053     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00054   }
00055 
00056   if (gearman_server_thread_create(&(gearmand->server),
00057                                    &(thread->server_thread)) == NULL)
00058   {
00059     free(thread);
00060     gearmand_log_fatal(gearmand, "gearmand_thread_create:gearman_server_thread_create:NULL");
00061     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00062   }
00063 
00064   gearman_server_thread_set_log_fn(&(thread->server_thread), _log, thread,
00065                                    gearmand->verbose);
00066   gearman_server_thread_set_event_watch(&(thread->server_thread),
00067                                         gearmand_connection_watch, NULL);
00068 
00069   thread->is_thread_lock= false;
00070   thread->is_wakeup_event= false;
00071   thread->count= 0;
00072   thread->dcon_count= 0;
00073   thread->dcon_add_count= 0;
00074   thread->free_dcon_count= 0;
00075   thread->wakeup_fd[0]= -1;
00076   thread->wakeup_fd[1]= -1;
00077   GEARMAN_LIST_ADD(gearmand->thread, thread,)
00078   thread->gearmand= gearmand;
00079   thread->dcon_list= NULL;
00080   thread->dcon_add_list= NULL;
00081   thread->free_dcon_list= NULL;
00082 
00083   /* If we have no threads, we still create a fake thread that uses the main
00084      libevent instance. Otherwise create a libevent instance for each thread. */
00085   if (gearmand->threads == 0)
00086     thread->base= gearmand->base;
00087   else
00088   {
00089     gearmand_log_info(gearmand, "Initializing libevent for IO thread");
00090 
00091     thread->base= event_base_new();
00092     if (thread->base == NULL)
00093     {
00094       gearmand_thread_free(thread);
00095       gearmand_log_fatal(gearmand, "gearmand_thread_create:event_base_new:NULL");
00096       return GEARMAN_EVENT;
00097     }
00098   }
00099 
00100   ret= _wakeup_init(thread);
00101   if (ret != GEARMAN_SUCCESS)
00102   {
00103     gearmand_thread_free(thread);
00104     return ret;
00105   }
00106 
00107   /* If we are not running multi-threaded, just return the thread context. */
00108   if (gearmand->threads == 0)
00109     return GEARMAN_SUCCESS;
00110 
00111   thread->count= gearmand->thread_count;
00112 
00113   pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
00114   if (pthread_ret != 0)
00115   {
00116     thread->count= 0;
00117     gearmand_thread_free(thread);
00118     gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_mutex_init:%d", pthread_ret);
00119     return GEARMAN_PTHREAD;
00120   }
00121 
00122   thread->is_thread_lock= true;
00123 
00124   gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
00125 
00126   pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
00127   if (pthread_ret != 0)
00128   {
00129     thread->count= 0;
00130     gearmand_thread_free(thread);
00131     gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_create:%d", pthread_ret);
00132 
00133     return GEARMAN_PTHREAD;
00134   }
00135 
00136   gearmand_log_info(gearmand, "Thread %u created", thread->count);
00137 
00138   return GEARMAN_SUCCESS;
00139 }
00140 
00141 void gearmand_thread_free(gearmand_thread_st *thread)
00142 {
00143   gearmand_con_st *dcon;
00144 
00145   if (thread->gearmand->threads && thread->count > 0)
00146   {
00147     gearmand_log_info(thread->gearmand, "Shutting down thread %u", thread->count);
00148 
00149     gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
00150     (void) pthread_join(thread->id, NULL);
00151   }
00152 
00153   if (thread->is_thread_lock)
00154     (void) pthread_mutex_destroy(&(thread->lock));
00155 
00156   _wakeup_close(thread);
00157 
00158   while (thread->dcon_list != NULL)
00159     gearmand_con_free(thread->dcon_list);
00160 
00161   while (thread->dcon_add_list != NULL)
00162   {
00163     dcon= thread->dcon_add_list;
00164     thread->dcon_add_list= dcon->next;
00165     close(dcon->fd);
00166     free(dcon);
00167   }
00168 
00169   while (thread->free_dcon_list != NULL)
00170   {
00171     dcon= thread->free_dcon_list;
00172     thread->free_dcon_list= dcon->next;
00173     free(dcon);
00174   }
00175 
00176   gearman_server_thread_free(&(thread->server_thread));
00177 
00178   GEARMAN_LIST_DEL(thread->gearmand->thread, thread,)
00179 
00180   if (thread->gearmand->threads > 0)
00181   {
00182     if (thread->base != NULL)
00183       event_base_free(thread->base);
00184 
00185     gearmand_log_info(thread->gearmand, "Thread %u shutdown complete", thread->count);
00186   }
00187 
00188   free(thread);
00189 }
00190 
00191 void gearmand_thread_wakeup(gearmand_thread_st *thread,
00192                             gearmand_wakeup_t wakeup)
00193 {
00194   uint8_t buffer= wakeup;
00195 
00196   /* If this fails, there is not much we can really do. This should never fail
00197      though if the thread is still active. */
00198   if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
00199     gearmand_log_error(thread->gearmand, "gearmand_thread_wakeup:write:%d", errno);
00200 }
00201 
00202 void gearmand_thread_run(gearmand_thread_st *thread)
00203 {
00204   gearman_server_con_st *server_con;
00205   gearman_return_t ret;
00206   gearmand_con_st *dcon;
00207 
00208   while (1)
00209   {
00210     server_con= gearman_server_thread_run(&(thread->server_thread), &ret);
00211     if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
00212         ret == GEARMAN_SHUTDOWN_GRACEFUL)
00213     {
00214       return;
00215     }
00216 
00217     if (server_con == NULL)
00218     {
00219       /* We either got a GEARMAN_SHUTDOWN or some other fatal internal error.
00220          Either way, we want to shut the server down. */
00221       gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00222       return;
00223     }
00224 
00225     dcon= (gearmand_con_st *)gearman_server_con_data(server_con);
00226 
00227     gearmand_log_info(thread->gearmand, "[%4u] %15s:%5s Disconnected", thread->count, dcon->host, dcon->port);
00228 
00229     gearmand_con_free(dcon);
00230   }
00231 }
00232 
00233 /*
00234  * Private definitions
00235  */
00236 
00237 static void *_thread(void *data)
00238 {
00239   gearmand_thread_st *thread= (gearmand_thread_st *)data;
00240 
00241   gearmand_log_info(thread->gearmand, "[%4u] Entering thread event loop", thread->count);
00242 
00243   if (event_base_loop(thread->base, 0) == -1)
00244   {
00245     gearmand_log_fatal(thread->gearmand, "_io_thread:event_base_loop:-1");
00246     thread->gearmand->ret= GEARMAN_EVENT;
00247   }
00248 
00249   gearmand_log_info(thread->gearmand, "[%4u] Exiting thread event loop", thread->count);
00250 
00251   return NULL;
00252 }
00253 
00254 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00255 {
00256   gearmand_thread_st *dthread= (gearmand_thread_st *)context;
00257   char buffer[GEARMAN_MAX_ERROR_SIZE];
00258 
00259   snprintf(buffer, GEARMAN_MAX_ERROR_SIZE, "[%4u] %s", dthread->count, line);
00260   (*dthread->gearmand->log_fn)(buffer, verbose,
00261                                (void *)dthread->gearmand->log_context);
00262 }
00263 
00264 static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
00265                  void *fn_arg)
00266 {
00267   gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
00268   gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
00269 }
00270 
00271 static gearman_return_t _wakeup_init(gearmand_thread_st *thread)
00272 {
00273   int ret;
00274 
00275   gearmand_log_info(thread->gearmand, "Creating IO thread wakeup pipe");
00276 
00277   ret= pipe(thread->wakeup_fd);
00278   if (ret == -1)
00279   {
00280     gearmand_log_fatal(thread->gearmand, "_wakeup_init:pipe:%d", errno);
00281     return GEARMAN_ERRNO;
00282   }
00283 
00284   ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
00285   if (ret == -1)
00286   {
00287     gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
00288     return GEARMAN_ERRNO;
00289   }
00290 
00291   ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00292   if (ret == -1)
00293   {
00294     gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
00295     return GEARMAN_ERRNO;
00296   }
00297 
00298   event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
00299             _wakeup_event, thread);
00300   event_base_set(thread->base, &(thread->wakeup_event));
00301 
00302   if (event_add(&(thread->wakeup_event), NULL) == -1)
00303   {
00304     gearmand_log_fatal(thread->gearmand, "_wakeup_init:event_add:-1");
00305     return GEARMAN_EVENT;
00306   }
00307 
00308   thread->is_wakeup_event= true;
00309 
00310   return GEARMAN_SUCCESS;
00311 }
00312 
00313 static void _wakeup_close(gearmand_thread_st *thread)
00314 {
00315   _wakeup_clear(thread);
00316 
00317   if (thread->wakeup_fd[0] >= 0)
00318   {
00319     gearmand_log_info(thread->gearmand, "Closing IO thread wakeup pipe");
00320     close(thread->wakeup_fd[0]);
00321     thread->wakeup_fd[0]= -1;
00322     close(thread->wakeup_fd[1]);
00323     thread->wakeup_fd[1]= -1;
00324   }
00325 }
00326 
00327 static void _wakeup_clear(gearmand_thread_st *thread)
00328 {
00329   if (thread->is_wakeup_event)
00330   {
00331     gearmand_log_info(thread->gearmand, "[%4u] Clearing event for IO thread wakeup pipe", thread->count);
00332     int del_ret= event_del(&(thread->wakeup_event));
00333     assert(del_ret == 0);
00334     thread->is_wakeup_event= false;
00335   }
00336 }
00337 
00338 static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
00339 {
00340   gearmand_thread_st *thread= (gearmand_thread_st *)arg;
00341   uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00342   ssize_t ret;
00343   ssize_t x;
00344 
00345   while (1)
00346   {
00347     ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00348     if (ret == 0)
00349     {
00350       _clear_events(thread);
00351       gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:EOF");
00352       thread->gearmand->ret= GEARMAN_PIPE_EOF;
00353       return;
00354     }
00355     else if (ret == -1)
00356     {
00357       if (errno == EINTR)
00358         continue;
00359 
00360       if (errno == EAGAIN)
00361         break;
00362 
00363       _clear_events(thread);
00364       gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:%d", errno);
00365       thread->gearmand->ret= GEARMAN_ERRNO;
00366       return;
00367     }
00368 
00369     for (x= 0; x < ret; x++)
00370     {
00371       switch ((gearmand_wakeup_t)buffer[x])
00372       {
00373       case GEARMAND_WAKEUP_PAUSE:
00374         gearmand_log_info(thread->gearmand, "[%4u] Received PAUSE wakeup event", thread->count);
00375         break;
00376 
00377       case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00378         gearmand_log_info(thread->gearmand,
00379                           "[%4u] Received SHUTDOWN_GRACEFUL wakeup event",
00380                           thread->count);
00381         if (gearman_server_shutdown_graceful(&(thread->gearmand->server)) == GEARMAN_SHUTDOWN)
00382         {
00383           gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00384         }
00385         break;
00386 
00387       case GEARMAND_WAKEUP_SHUTDOWN:
00388         gearmand_log_info(thread->gearmand, "[%4u] Received SHUTDOWN wakeup event", thread->count);
00389         _clear_events(thread);
00390         break;
00391 
00392       case GEARMAND_WAKEUP_CON:
00393         gearmand_log_info(thread->gearmand, "[%4u] Received CON wakeup event", thread->count);
00394         gearmand_con_check_queue(thread);
00395         break;
00396 
00397       case GEARMAND_WAKEUP_RUN:
00398         gearmand_log_debug(thread->gearmand, "[%4u] Received RUN wakeup event", thread->count);
00399         gearmand_thread_run(thread);
00400         break;
00401 
00402       default:
00403         gearmand_log_fatal(thread->gearmand, "[%4u] Received unknown wakeup event (%u)", thread->count, buffer[x]);
00404         _clear_events(thread);
00405         thread->gearmand->ret= GEARMAN_UNKNOWN_STATE;
00406         break;
00407       }
00408     }
00409   }
00410 }
00411 
00412 static void _clear_events(gearmand_thread_st *thread)
00413 {
00414   _wakeup_clear(thread);
00415 
00416   while (thread->dcon_list != NULL)
00417     gearmand_con_free(thread->dcon_list);
00418 }