Gearman Developer Documentation

libgearman-server/connection.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 /*
00017  * Public definitions
00018  */
00019 
00020 gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread,
00021                                               int fd, void *data)
00022 {
00023   gearman_server_con_st *con;
00024   gearman_return_t ret;
00025 
00026   con= gearman_server_con_create(thread);
00027   if (con == NULL)
00028     return NULL;
00029 
00030   if (gearman_connection_set_fd(&(con->con), fd) != GEARMAN_SUCCESS)
00031   {
00032     gearman_server_con_free(con);
00033     return NULL;
00034   }
00035 
00036   con->con.context= data;
00037 
00038   ret= gearman_connection_set_events(&(con->con), POLLIN);
00039   if (ret != GEARMAN_SUCCESS)
00040   {
00041     gearman_server_con_free(con);
00042     return NULL;
00043   }
00044 
00045   return con;
00046 }
00047 
00048 gearman_server_con_st *
00049 gearman_server_con_create(gearman_server_thread_st *thread)
00050 {
00051   gearman_server_con_st *con;
00052 
00053   if (thread->free_con_count > 0)
00054   {
00055     con= thread->free_con_list;
00056     GEARMAN_LIST_DEL(thread->free_con, con,)
00057   }
00058   else
00059   {
00060     con= (gearman_server_con_st *)malloc(sizeof(gearman_server_con_st));
00061     if (con == NULL)
00062     {
00063       gearman_log_error(thread->gearman, "gearman_server_con_create", "malloc");
00064       return NULL;
00065     }
00066   }
00067 
00068   gearman_connection_options_t options[] = { GEARMAN_CON_IGNORE_LOST_CONNECTION, GEARMAN_CON_MAX };
00069   if (gearman_connection_create(thread->gearman, &(con->con), options) == NULL)
00070   {
00071     free(con);
00072     return NULL;
00073   }
00074 
00075   con->is_sleeping= false;
00076   con->is_exceptions= false;
00077   con->is_dead= false;
00078   con->is_noop_sent= false;
00079 
00080   con->ret= 0;
00081   con->io_list= false;
00082   con->proc_list= false;
00083   con->proc_removed= false;
00084   con->io_packet_count= 0;
00085   con->proc_packet_count= 0;
00086   con->worker_count= 0;
00087   con->client_count= 0;
00088   con->thread= thread;
00089   con->packet= NULL;
00090   con->io_packet_list= NULL;
00091   con->io_packet_end= NULL;
00092   con->proc_packet_list= NULL;
00093   con->proc_packet_end= NULL;
00094   con->io_next= NULL;
00095   con->io_prev= NULL;
00096   con->proc_next= NULL;
00097   con->proc_prev= NULL;
00098   con->worker_list= NULL;
00099   con->client_list= NULL;
00100   con->host= NULL;
00101   con->port= NULL;
00102   strcpy(con->id, "-");
00103 
00104   (void) pthread_mutex_lock(&thread->lock);
00105   GEARMAN_LIST_ADD(thread->con, con,)
00106   (void) pthread_mutex_unlock(&thread->lock);
00107 
00108   return con;
00109 }
00110 
00111 void gearman_server_con_free(gearman_server_con_st *con)
00112 {
00113   gearman_server_thread_st *thread= con->thread;
00114   gearman_server_packet_st *packet;
00115 
00116   con->host= NULL;
00117   con->port= NULL;
00118 
00119   if (thread->server->flags.threaded &&
00120       !(con->proc_removed) && !(thread->server->proc_shutdown))
00121   {
00122     con->is_dead= true;
00123     con->is_sleeping= false;
00124     con->is_exceptions= false;
00125     con->is_noop_sent= false;
00126     gearman_server_con_proc_add(con);
00127     return;
00128   }
00129 
00130   gearman_connection_free(&(con->con));
00131 
00132   if (con->proc_list)
00133     gearman_server_con_proc_remove(con);
00134 
00135   if (con->io_list)
00136     gearman_server_con_io_remove(con);
00137 
00138   if (con->packet != NULL)
00139   {
00140     if (&(con->packet->packet) != con->con.recv_packet)
00141       gearman_packet_free(&(con->packet->packet));
00142     gearman_server_packet_free(con->packet, con->thread, true);
00143   }
00144 
00145   while (con->io_packet_list != NULL)
00146     gearman_server_io_packet_remove(con);
00147 
00148   while (con->proc_packet_list != NULL)
00149   {
00150     packet= gearman_server_proc_packet_remove(con);
00151     gearman_packet_free(&(packet->packet));
00152     gearman_server_packet_free(packet, con->thread, true);
00153   }
00154 
00155   gearman_server_con_free_workers(con);
00156 
00157   while (con->client_list != NULL)
00158     gearman_server_client_free(con->client_list);
00159 
00160   (void) pthread_mutex_lock(&thread->lock);
00161   GEARMAN_LIST_DEL(con->thread->con, con,)
00162   (void) pthread_mutex_unlock(&thread->lock);
00163 
00164   if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
00165     GEARMAN_LIST_ADD(thread->free_con, con,)
00166   else
00167     free(con);
00168 }
00169 
00170 gearman_connection_st *gearman_server_con_con(gearman_server_con_st *con)
00171 {
00172   return &con->con;
00173 }
00174 
00175 const void *gearman_server_con_data(const gearman_server_con_st *con)
00176 {
00177   return gearman_connection_context(&(con->con));
00178 }
00179 
00180 void gearman_server_con_set_data(gearman_server_con_st *con, void *data)
00181 {
00182   gearman_connection_set_context(&(con->con), data);
00183 }
00184 
00185 const char *gearman_server_con_host(gearman_server_con_st *con)
00186 {
00187   return con->host;
00188 }
00189 
00190 void gearman_server_con_set_host(gearman_server_con_st *con, const char *host)
00191 {
00192   con->host= host;
00193 }
00194 
00195 const char *gearman_server_con_port(gearman_server_con_st *con)
00196 {
00197   return con->port;
00198 }
00199 
00200 void gearman_server_con_set_port(gearman_server_con_st *con, const char *port)
00201 {
00202   con->port= port;
00203 }
00204 
00205 const char *gearman_server_con_id(gearman_server_con_st *con)
00206 {
00207   return con->id;
00208 }
00209 
00210 void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
00211                                size_t size)
00212 {
00213   if (size >= GEARMAN_SERVER_CON_ID_SIZE)
00214     size= GEARMAN_SERVER_CON_ID_SIZE - 1;
00215 
00216   memcpy(con->id, id, size);
00217   con->id[size]= 0;
00218 }
00219 
00220 void gearman_server_con_free_worker(gearman_server_con_st *con,
00221                                     char *function_name,
00222                                     size_t function_name_size)
00223 {
00224   gearman_server_worker_st *worker= con->worker_list;
00225   gearman_server_worker_st *prev_worker= NULL;
00226 
00227   while (worker != NULL)
00228   {
00229     if (worker->function->function_name_size == function_name_size &&
00230         !memcmp(worker->function->function_name, function_name,
00231                 function_name_size))
00232     {
00233       gearman_server_worker_free(worker);
00234 
00235       /* Set worker to the last kept worker, or the beginning of the list. */
00236       if (prev_worker == NULL)
00237         worker= con->worker_list;
00238       else
00239         worker= prev_worker;
00240     }
00241     else
00242     {
00243       /* Save this so we don't need to scan the list again if one is removed. */
00244       prev_worker= worker;
00245       worker= worker->con_next;
00246     }
00247   }
00248 }
00249 
00250 void gearman_server_con_free_workers(gearman_server_con_st *con)
00251 {
00252   while (con->worker_list != NULL)
00253     gearman_server_worker_free(con->worker_list);
00254 }
00255 
00256 void gearman_server_con_io_add(gearman_server_con_st *con)
00257 {
00258   if (con->io_list)
00259     return;
00260 
00261   (void) pthread_mutex_lock(&con->thread->lock);
00262 
00263   GEARMAN_LIST_ADD(con->thread->io, con, io_)
00264   con->io_list= true;
00265 
00266   /* Looks funny, but need to check io_count locked, but call run unlocked. */
00267   if (con->thread->io_count == 1 && con->thread->run_fn)
00268   {
00269     (void) pthread_mutex_unlock(&con->thread->lock);
00270     (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
00271   }
00272   else
00273   {
00274     (void) pthread_mutex_unlock(&con->thread->lock);
00275   }
00276 }
00277 
00278 void gearman_server_con_io_remove(gearman_server_con_st *con)
00279 {
00280   (void) pthread_mutex_lock(&con->thread->lock);
00281   if (con->io_list)
00282   {
00283     GEARMAN_LIST_DEL(con->thread->io, con, io_)
00284     con->io_list= false;
00285   }
00286   (void) pthread_mutex_unlock(&con->thread->lock);
00287 }
00288 
00289 gearman_server_con_st *
00290 gearman_server_con_io_next(gearman_server_thread_st *thread)
00291 {
00292   gearman_server_con_st *con= thread->io_list;
00293 
00294   if (con == NULL)
00295     return NULL;
00296 
00297   gearman_server_con_io_remove(con);
00298 
00299   return con;
00300 }
00301 
00302 void gearman_server_con_proc_add(gearman_server_con_st *con)
00303 {
00304   if (con->proc_list)
00305     return;
00306 
00307   (void) pthread_mutex_lock(&con->thread->lock);
00308   GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
00309   con->proc_list= true;
00310   (void) pthread_mutex_unlock(&con->thread->lock);
00311 
00312   if (! (con->thread->server->proc_shutdown) &&
00313       !(con->thread->server->proc_wakeup))
00314   {
00315     (void) pthread_mutex_lock(&(con->thread->server->proc_lock));
00316     con->thread->server->proc_wakeup= true;
00317     (void) pthread_cond_signal(&(con->thread->server->proc_cond));
00318     (void) pthread_mutex_unlock(&(con->thread->server->proc_lock));
00319   }
00320 }
00321 
00322 void gearman_server_con_proc_remove(gearman_server_con_st *con)
00323 {
00324   (void) pthread_mutex_lock(&con->thread->lock);
00325 
00326   if (con->proc_list)
00327   {
00328     GEARMAN_LIST_DEL(con->thread->proc, con, proc_)
00329     con->proc_list= false;
00330   }
00331   (void) pthread_mutex_unlock(&con->thread->lock);
00332 }
00333 
00334 gearman_server_con_st *
00335 gearman_server_con_proc_next(gearman_server_thread_st *thread)
00336 {
00337   gearman_server_con_st *con;
00338 
00339   if (thread->proc_list == NULL)
00340     return NULL;
00341 
00342   (void) pthread_mutex_lock(&thread->lock);
00343 
00344   con= thread->proc_list;
00345   while (con != NULL)
00346   {
00347     GEARMAN_LIST_DEL(thread->proc, con, proc_)
00348     con->proc_list= false;
00349     if (!(con->proc_removed))
00350       break;
00351     con= thread->proc_list;
00352   }
00353 
00354   (void) pthread_mutex_unlock(&thread->lock);
00355 
00356   return con;
00357 }