Gearman Developer Documentation

libgearman-server/thread.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 /*
00017  * Private declarations
00018  */
00019 
00029 gearman_return_t _thread_packet_read(gearman_server_con_st *con);
00030 
00034 static gearman_return_t _thread_packet_flush(gearman_server_con_st *con);
00035 
00039 static gearman_return_t _proc_thread_start(gearman_server_st *server);
00040 
00044 static void _proc_thread_kill(gearman_server_st *server);
00045 
00049 static void *_proc(void *data);
00050 
00054 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00055 
00058 /*
00059  * Public definitions
00060  */
00061 
00062 gearman_server_thread_st *
00063 gearman_server_thread_create(gearman_server_st *server,
00064                              gearman_server_thread_st *thread)
00065 {
00066   if (server->thread_count == 1)
00067   {
00068     /* The server is going to be multi-threaded, start processing thread. */
00069     if (_proc_thread_start(server) != GEARMAN_SUCCESS)
00070       return NULL;
00071   }
00072 
00073   if (thread == NULL)
00074   {
00075     thread= (gearman_server_thread_st *)malloc(sizeof(gearman_server_thread_st));
00076     if (thread == NULL)
00077     {
00078       _proc_thread_kill(server);
00079       return NULL;
00080     }
00081 
00082     thread->options.allocated= true;
00083   }
00084   else
00085   {
00086     thread->options.allocated= false;
00087   }
00088 
00089   thread->con_count= 0;
00090   thread->io_count= 0;
00091   thread->proc_count= 0;
00092   thread->free_con_count= 0;
00093   thread->free_packet_count= 0;
00094   thread->server= server;
00095   thread->log_fn= NULL;
00096   thread->log_context= NULL;
00097   thread->run_fn= NULL;
00098   thread->run_fn_arg= NULL;
00099   thread->con_list= NULL;
00100   thread->io_list= NULL;
00101   thread->proc_list= NULL;
00102   thread->free_con_list= NULL;
00103   thread->free_packet_list= NULL;
00104 
00105   if (pthread_mutex_init(&(thread->lock), NULL) != 0)
00106   {
00107     if (thread->options.allocated)
00108       free(thread);
00109 
00110     return NULL;
00111   }
00112 
00113   GEARMAN_LIST_ADD(server->thread, thread,);
00114 
00115   gearman_universal_options_t options[]= { GEARMAN_NON_BLOCKING, GEARMAN_DONT_TRACK_PACKETS, GEARMAN_MAX};
00116   thread->gearman= gearman_universal_create(&(thread->gearman_universal_static), options);
00117   if (thread->gearman == NULL)
00118   {
00119     gearman_server_thread_free(thread);
00120     return NULL;
00121   }
00122 
00123   return thread;
00124 }
00125 
00126 void gearman_server_thread_free(gearman_server_thread_st *thread)
00127 {
00128   gearman_server_con_st *con;
00129   gearman_server_packet_st *packet;
00130 
00131   _proc_thread_kill(thread->server);
00132 
00133   while (thread->con_list != NULL)
00134     gearman_server_con_free(thread->con_list);
00135 
00136   while (thread->free_con_list != NULL)
00137   {
00138     con= thread->free_con_list;
00139     thread->free_con_list= con->next;
00140     free(con);
00141   }
00142 
00143   while (thread->free_packet_list != NULL)
00144   {
00145     packet= thread->free_packet_list;
00146     thread->free_packet_list= packet->next;
00147     free(packet);
00148   }
00149 
00150   if (thread->gearman != NULL)
00151     gearman_universal_free(thread->gearman);
00152 
00153   pthread_mutex_destroy(&(thread->lock));
00154 
00155   GEARMAN_LIST_DEL(thread->server->thread, thread,)
00156 
00157   if (thread->options.allocated)
00158     free(thread);
00159 }
00160 
00161 const char *gearman_server_thread_error(gearman_server_thread_st *thread)
00162 {
00163   return gearman_universal_error(thread->gearman);
00164 }
00165 
00166 int gearman_server_thread_errno(gearman_server_thread_st *thread)
00167 {
00168   return gearman_universal_errno(thread->gearman);
00169 }
00170 
00171 void gearman_server_thread_set_event_watch(gearman_server_thread_st *thread,
00172                                            gearman_event_watch_fn *event_watch,
00173                                            void *event_watch_arg)
00174 {
00175   gearman_set_event_watch_fn(thread->gearman, event_watch, event_watch_arg);
00176 }
00177 
00178 void gearman_server_thread_set_run(gearman_server_thread_st *thread,
00179                                    gearman_server_thread_run_fn *run_fn,
00180                                    void *run_fn_arg)
00181 {
00182   thread->run_fn= run_fn;
00183   thread->run_fn_arg= run_fn_arg;
00184 }
00185 
00186 void gearman_server_thread_set_log_fn(gearman_server_thread_st *thread,
00187                                       gearman_log_fn *function,
00188                                       void *context,
00189                                       gearman_verbose_t verbose)
00190 {
00191   thread->log_fn= function;
00192   thread->log_context= context;
00193   gearman_set_log_fn(thread->gearman, _log, thread, verbose);
00194 }
00195 
00196 gearman_server_con_st *
00197 gearman_server_thread_run(gearman_server_thread_st *thread,
00198                           gearman_return_t *ret_ptr)
00199 {
00200   gearman_connection_st *con;
00201   gearman_server_con_st *server_con;
00202 
00203   /* If we are multi-threaded, we may have packets to flush or connections that
00204      should start reading again. */
00205   if (thread->server->flags.threaded)
00206   {
00207     while ((server_con= gearman_server_con_io_next(thread)) != NULL)
00208     {
00209       if (server_con->is_dead)
00210       {
00211         if (server_con->proc_removed)
00212           gearman_server_con_free(server_con);
00213 
00214         continue;
00215       }
00216 
00217       if (server_con->ret != GEARMAN_SUCCESS)
00218       {
00219         *ret_ptr= server_con->ret;
00220         return server_con;
00221       }
00222 
00223       /* See if any outgoing packets were queued. */
00224       *ret_ptr= _thread_packet_flush(server_con);
00225       if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00226         return server_con;
00227     }
00228   }
00229 
00230   /* Check for new activity on connections. */
00231   while ((con= gearman_ready(thread->gearman)) != NULL)
00232   {
00233     /* Inherited classes anyone? Some people would call this a hack, I call
00234        it clean (avoids extra ptrs). Brian, I'll give you your C99 0-byte
00235        arrays at the ends of structs for this. :) */
00236     server_con= (gearman_server_con_st *)con;
00237 
00238     /* Try to read new packets. */
00239     if (con->revents & POLLIN)
00240     {
00241       *ret_ptr= _thread_packet_read(server_con);
00242       if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00243         return server_con;
00244     }
00245 
00246     /* Flush existing outgoing packets. */
00247     if (con->revents & POLLOUT)
00248     {
00249       *ret_ptr= _thread_packet_flush(server_con);
00250       if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00251         return server_con;
00252     }
00253   }
00254 
00255   /* Start flushing new outgoing packets if we are single threaded. */
00256   if (! (thread->server->flags.threaded))
00257   {
00258     while ((server_con= gearman_server_con_io_next(thread)) != NULL)
00259     {
00260       *ret_ptr= _thread_packet_flush(server_con);
00261       if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00262         return server_con;
00263     }
00264   }
00265 
00266   /* Check for the two shutdown modes. */
00267   if (thread->server->shutdown)
00268     *ret_ptr= GEARMAN_SHUTDOWN;
00269   else if (thread->server->shutdown_graceful)
00270   {
00271     if (thread->server->job_count == 0)
00272       *ret_ptr= GEARMAN_SHUTDOWN;
00273     else
00274       *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
00275   }
00276   else
00277     *ret_ptr= GEARMAN_SUCCESS;
00278 
00279   return NULL;
00280 }
00281 
00282 /*
00283  * Private definitions
00284  */
00285 
00286 gearman_return_t _thread_packet_read(gearman_server_con_st *con)
00287 {
00288   gearman_return_t ret;
00289 
00290   while (1)
00291   {
00292     if (con->packet == NULL)
00293     {
00294       con->packet= gearman_server_packet_create(con->thread, true);
00295       if (con->packet == NULL)
00296         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00297     }
00298 
00299     (void)gearman_connection_recv(&(con->con), &(con->packet->packet), &ret, true);
00300     if (ret != GEARMAN_SUCCESS)
00301     {
00302       if (ret == GEARMAN_IO_WAIT)
00303         break;
00304 
00305       gearman_server_packet_free(con->packet, con->thread, true);
00306       con->packet= NULL;
00307       return ret;
00308     }
00309 
00310     gearman_log_debug(con->thread->gearman, "%15s:%5s Received  %s",
00311                       con->host == NULL ? "-" : con->host,
00312                       con->port == NULL ? "-" : con->port,
00313                       gearman_command_info_list[con->packet->packet.command].name);
00314 
00315     /* We read a complete packet. */
00316     if (con->thread->server->flags.threaded)
00317     {
00318       /* Multi-threaded, queue for the processing thread to run. */
00319       gearman_server_proc_packet_add(con, con->packet);
00320       con->packet= NULL;
00321     }
00322     else
00323     {
00324       /* Single threaded, run the command here. */
00325       ret= gearman_server_run_command(con, &(con->packet->packet));
00326       gearman_packet_free(&(con->packet->packet));
00327       gearman_server_packet_free(con->packet, con->thread, true);
00328       con->packet= NULL;
00329       if (ret != GEARMAN_SUCCESS)
00330         return ret;
00331     }
00332   }
00333 
00334   return GEARMAN_SUCCESS;
00335 }
00336 
00337 static gearman_return_t _thread_packet_flush(gearman_server_con_st *con)
00338 {
00339   gearman_return_t ret;
00340 
00341   /* Check to see if we've already tried to avoid excessive system calls. */
00342   if (con->con.events & POLLOUT)
00343     return GEARMAN_IO_WAIT;
00344 
00345   while (con->io_packet_list != NULL)
00346   {
00347     ret= gearman_connection_send(&(con->con), &(con->io_packet_list->packet),
00348                           con->io_packet_list->next == NULL ? true : false);
00349     if (ret != GEARMAN_SUCCESS)
00350       return ret;
00351 
00352     gearman_log_debug(con->thread->gearman, "%15s:%5s Sent      %s",
00353                       con->host == NULL ? "-" : con->host,
00354                       con->port == NULL ? "-" : con->port,
00355                       gearman_command_info_list[con->io_packet_list->packet.command].name);
00356 
00357     gearman_server_io_packet_remove(con);
00358   }
00359 
00360   /* Clear the POLLOUT flag. */
00361   return gearman_connection_set_events(&(con->con), POLLIN);
00362 }
00363 
00364 static gearman_return_t _proc_thread_start(gearman_server_st *server)
00365 {
00366   pthread_attr_t attr;
00367 
00368   if (pthread_mutex_init(&(server->proc_lock), NULL) != 0)
00369     return GEARMAN_PTHREAD;
00370 
00371   if (pthread_cond_init(&(server->proc_cond), NULL) != 0)
00372     return GEARMAN_PTHREAD;
00373 
00374   if (pthread_attr_init(&attr) != 0)
00375     return GEARMAN_PTHREAD;
00376 
00377   if (pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0)
00378     return GEARMAN_PTHREAD;
00379 
00380   if (pthread_create(&(server->proc_id), &attr, _proc, server) != 0)
00381     return GEARMAN_PTHREAD;
00382 
00383   (void) pthread_attr_destroy(&attr);
00384 
00385   server->flags.threaded= true;
00386 
00387   return GEARMAN_SUCCESS;
00388 }
00389 
00390 static void _proc_thread_kill(gearman_server_st *server)
00391 {
00392   if (! (server->flags.threaded) || server->proc_shutdown)
00393     return;
00394 
00395   server->proc_shutdown= true;
00396 
00397   /* Signal proc thread to shutdown. */
00398   (void) pthread_mutex_lock(&(server->proc_lock));
00399   (void) pthread_cond_signal(&(server->proc_cond));
00400   (void) pthread_mutex_unlock(&(server->proc_lock));
00401 
00402   /* Wait for the proc thread to exit and then cleanup. */
00403   (void) pthread_join(server->proc_id, NULL);
00404   (void) pthread_cond_destroy(&(server->proc_cond));
00405   (void) pthread_mutex_destroy(&(server->proc_lock));
00406 }
00407 
00408 static void *_proc(void *data)
00409 {
00410   gearman_server_st *server= (gearman_server_st *)data;
00411   gearman_server_thread_st *thread;
00412   gearman_server_con_st *con;
00413   gearman_server_packet_st *packet;
00414 
00415   while (1)
00416   {
00417     (void) pthread_mutex_lock(&(server->proc_lock));
00418     while (server->proc_wakeup == false)
00419     {
00420       if (server->proc_shutdown)
00421       {
00422         (void) pthread_mutex_unlock(&(server->proc_lock));
00423         return NULL;
00424       }
00425 
00426       (void) pthread_cond_wait(&(server->proc_cond), &(server->proc_lock));
00427     }
00428     server->proc_wakeup= false;
00429     (void) pthread_mutex_unlock(&(server->proc_lock));
00430 
00431     for (thread= server->thread_list; thread != NULL; thread= thread->next)
00432     {
00433       while ((con= gearman_server_con_proc_next(thread)) != NULL)
00434       {
00435         if (con->is_dead)
00436         {
00437           gearman_server_con_free_workers(con);
00438 
00439           while (con->client_list != NULL)
00440             gearman_server_client_free(con->client_list);
00441 
00442           con->proc_removed= true;
00443           gearman_server_con_io_add(con);
00444           continue;
00445         }
00446 
00447         while (1)
00448         {
00449           packet= gearman_server_proc_packet_remove(con);
00450           if (packet == NULL)
00451             break;
00452 
00453           con->ret= gearman_server_run_command(con, &(packet->packet));
00454           gearman_packet_free(&(packet->packet));
00455           gearman_server_packet_free(packet, con->thread, false);
00456         }
00457       }
00458     }
00459   }
00460 }
00461 
00462 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00463 {
00464   gearman_server_thread_st *thread= (gearman_server_thread_st *)context;
00465   (*(thread->log_fn))(line, verbose, (void *)thread->log_context);
00466 }