Gearman Developer Documentation

libgearman-server/gearmand_con.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 #include "gearmand.h"
00016 
00017 /*
00018  * Private declarations
00019  */
00020 
00027 static void _con_ready(int fd, short events, void *arg);
00028 
00029 static gearman_return_t _con_add(gearmand_thread_st *thread,
00030                                  gearmand_con_st *con);
00031 
00034 /*
00035  * Public definitions
00036  */
00037 
00038 gearman_return_t gearmand_con_create(gearmand_st *gearmand, int fd,
00039                                      const char *host, const char *port,
00040                                      gearman_connection_add_fn *add_fn)
00041 {
00042   gearmand_con_st *dcon;
00043   gearmand_con_st *free_dcon_list;
00044   uint32_t free_dcon_count;
00045 
00046   if (gearmand->free_dcon_count > 0)
00047   {
00048     dcon= gearmand->free_dcon_list;
00049     GEARMAN_LIST_DEL(gearmand->free_dcon, dcon,)
00050   }
00051   else
00052   {
00053     dcon= (gearmand_con_st *)malloc(sizeof(gearmand_con_st));
00054     if (dcon == NULL)
00055     {
00056       close(fd);
00057       gearmand_log_fatal(gearmand, "gearmand_con_create:malloc");
00058       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00059     }
00060   }
00061 
00062   dcon->last_events= 0;
00063   dcon->fd= fd;
00064   dcon->next= NULL;
00065   dcon->prev= NULL;
00066   dcon->server_con= NULL;
00067   dcon->con= NULL;
00068   dcon->add_fn= NULL;
00069   strncpy(dcon->host, host, NI_MAXHOST - 1);
00070   strncpy(dcon->port, port, NI_MAXSERV - 1);
00071   dcon->add_fn= add_fn;
00072 
00073   /* If we are not threaded, just add the connection now. */
00074   if (gearmand->threads == 0)
00075   {
00076     dcon->thread= gearmand->thread_list;
00077     return _con_add(gearmand->thread_list, dcon);
00078   }
00079 
00080   /* We do a simple round-robin connection queue algorithm here. */
00081   if (gearmand->thread_add_next == NULL)
00082     gearmand->thread_add_next= gearmand->thread_list;
00083 
00084   dcon->thread= gearmand->thread_add_next;
00085 
00086   /* We don't need to lock if the list is empty. */
00087   if (dcon->thread->dcon_add_count == 0 &&
00088       dcon->thread->free_dcon_count < gearmand->max_thread_free_dcon_count)
00089   {
00090     GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
00091     gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
00092   }
00093   else
00094   {
00095     (void ) pthread_mutex_lock(&(dcon->thread->lock));
00096 
00097     GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
00098 
00099     /* Take the free connection structures back to reuse. */
00100     free_dcon_list= dcon->thread->free_dcon_list;
00101     free_dcon_count= dcon->thread->free_dcon_count;
00102     dcon->thread->free_dcon_list= NULL;
00103     dcon->thread->free_dcon_count= 0;
00104 
00105     (void ) pthread_mutex_unlock(&(dcon->thread->lock));
00106 
00107     /* Only wakeup the thread if this is the first in the queue. We don't need
00108        to lock around the count check, worst case it was already picked up and
00109        we send an extra byte. */
00110     if (dcon->thread->dcon_add_count == 1)
00111       gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
00112 
00113     /* Put the free connection structures we grabbed on the main list. */
00114     while (free_dcon_list != NULL)
00115     {
00116       dcon= free_dcon_list;
00117       GEARMAN_LIST_DEL(free_dcon, dcon,)
00118       GEARMAN_LIST_ADD(gearmand->free_dcon, dcon,)
00119     }
00120   }
00121 
00122   gearmand->thread_add_next= gearmand->thread_add_next->next;
00123 
00124   return GEARMAN_SUCCESS;
00125 }
00126 
00127 void gearmand_con_free(gearmand_con_st *dcon)
00128 {
00129   int del_ret= event_del(&(dcon->event));  
00130   assert(del_ret == 0);
00131 
00132   /* This gets around a libevent bug when both POLLIN and POLLOUT are set. */
00133   event_set(&(dcon->event), dcon->fd, EV_READ, _con_ready, dcon);
00134   event_base_set(dcon->thread->base, &(dcon->event));
00135   event_add(&(dcon->event), NULL);
00136   del_ret= event_del(&(dcon->event));
00137   assert(del_ret == 0);
00138 
00139   gearman_server_con_free(dcon->server_con);
00140   GEARMAN_LIST_DEL(dcon->thread->dcon, dcon,)
00141 
00142   close(dcon->fd);
00143 
00144   if (dcon->thread->gearmand->free_dcon_count < GEARMAN_MAX_FREE_SERVER_CON)
00145   {
00146     if (dcon->thread->gearmand->threads == 0)
00147       GEARMAN_LIST_ADD(dcon->thread->gearmand->free_dcon, dcon,)
00148     else
00149     {
00150       /* Lock here because the main thread may be emptying this. */
00151       (void ) pthread_mutex_lock(&(dcon->thread->lock));
00152       GEARMAN_LIST_ADD(dcon->thread->free_dcon, dcon,)
00153       (void ) pthread_mutex_unlock(&(dcon->thread->lock));
00154     }
00155   }
00156   else
00157     free(dcon);
00158 }
00159 
00160 void gearmand_con_check_queue(gearmand_thread_st *thread)
00161 {
00162   gearmand_con_st *dcon;
00163 
00164   /* Dirty check is fine here, wakeup is always sent after add completes. */
00165   if (thread->dcon_add_count == 0)
00166     return;
00167 
00168   /* We want to add new connections inside the lock because other threads may
00169      walk the thread's dcon_list while holding the lock. */
00170   while (thread->dcon_add_list != NULL)
00171   {
00172     (void ) pthread_mutex_lock(&(thread->lock));
00173     dcon= thread->dcon_add_list;
00174     GEARMAN_LIST_DEL(thread->dcon_add, dcon,)
00175     (void ) pthread_mutex_unlock(&(thread->lock));
00176 
00177     if (_con_add(thread, dcon) != GEARMAN_SUCCESS)
00178       gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00179   }
00180 }
00181 
00182 gearman_return_t gearmand_connection_watch(gearman_connection_st *con, short events,
00183                                     void *context __attribute__ ((unused)))
00184 {
00185   (void) context;
00186   gearmand_con_st *dcon;
00187   short set_events= 0;
00188 
00189   dcon= (gearmand_con_st *)gearman_connection_context(con);
00190   dcon->con= con;
00191 
00192   if (events & POLLIN)
00193     set_events|= EV_READ;
00194   if (events & POLLOUT)
00195     set_events|= EV_WRITE;
00196 
00197   if (dcon->last_events != set_events)
00198   {
00199     if (dcon->last_events != 0)
00200     {
00201       int del_ret= event_del(&(dcon->event));
00202       assert(del_ret == 0);
00203     }
00204     event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready,
00205               dcon);
00206     event_base_set(dcon->thread->base, &(dcon->event));
00207 
00208     if (event_add(&(dcon->event), NULL) == -1)
00209     {
00210       gearmand_log_fatal(dcon->thread->gearmand, "_con_watch:event_add:-1");
00211       return GEARMAN_EVENT;
00212     }
00213 
00214     dcon->last_events= set_events;
00215   }
00216 
00217   gearmand_log_crazy(dcon->thread->gearmand, "[%4u] %15s:%5s Watching  %6s %s",
00218                      dcon->thread->count, dcon->host, dcon->port,
00219                      events & POLLIN ? "POLLIN" : "",
00220                      events & POLLOUT ? "POLLOUT" : "");
00221 
00222   return GEARMAN_SUCCESS;
00223 }
00224 
00225 /*
00226  * Private definitions
00227  */
00228 
00229 static void _con_ready(int fd __attribute__ ((unused)), short events,
00230                        void *arg)
00231 {
00232   gearmand_con_st *dcon= (gearmand_con_st *)arg;
00233   short revents= 0;
00234   gearman_return_t ret;
00235 
00236   if (events & EV_READ)
00237     revents|= POLLIN;
00238   if (events & EV_WRITE)
00239     revents|= POLLOUT;
00240 
00241   ret= gearman_connection_set_revents(dcon->con, revents);
00242   if (ret != GEARMAN_SUCCESS)
00243   {
00244     gearmand_con_free(dcon);
00245     return;
00246   }
00247 
00248   gearmand_log_crazy(dcon->thread->gearmand, "[%4u] %15s:%5s Ready     %6s %s",
00249                      dcon->thread->count, dcon->host, dcon->port,
00250                      revents & POLLIN ? "POLLIN" : "",
00251                      revents & POLLOUT ? "POLLOUT" : "");
00252 
00253   gearmand_thread_run(dcon->thread);
00254 }
00255 
00256 static gearman_return_t _con_add(gearmand_thread_st *thread,
00257                                  gearmand_con_st *dcon)
00258 {
00259   gearman_return_t ret;
00260 
00261   dcon->server_con= gearman_server_con_add(&(thread->server_thread), dcon->fd,
00262                                            dcon);
00263   if (dcon->server_con == NULL)
00264   {
00265     close(dcon->fd);
00266     free(dcon);
00267     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00268   }
00269 
00270   gearman_server_con_set_host(dcon->server_con, dcon->host);
00271   gearman_server_con_set_port(dcon->server_con, dcon->port);
00272 
00273   if (dcon->add_fn != NULL)
00274   {
00275     ret= (*dcon->add_fn)(gearman_server_con_con(dcon->server_con));
00276     if (ret != GEARMAN_SUCCESS)
00277     {
00278       gearman_server_con_free(dcon->server_con);
00279       close(dcon->fd);
00280       free(dcon);
00281       return ret;
00282     }
00283   }
00284 
00285   gearmand_log_info(thread->gearmand, "[%4u] %15s:%5s Connected", thread->count, dcon->host, dcon->port);
00286 
00287   GEARMAN_LIST_ADD(thread->dcon, dcon,)
00288 
00289   return GEARMAN_SUCCESS;
00290 }