00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00020 gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread,
00021 int fd, void *data)
00022 {
00023 gearman_server_con_st *con;
00024 gearman_return_t ret;
00025
00026 con= gearman_server_con_create(thread);
00027 if (con == NULL)
00028 return NULL;
00029
00030 if (gearman_connection_set_fd(&(con->con), fd) != GEARMAN_SUCCESS)
00031 {
00032 gearman_server_con_free(con);
00033 return NULL;
00034 }
00035
00036 con->con.context= data;
00037
00038 ret= gearman_connection_set_events(&(con->con), POLLIN);
00039 if (ret != GEARMAN_SUCCESS)
00040 {
00041 gearman_server_con_free(con);
00042 return NULL;
00043 }
00044
00045 return con;
00046 }
00047
00048 gearman_server_con_st *
00049 gearman_server_con_create(gearman_server_thread_st *thread)
00050 {
00051 gearman_server_con_st *con;
00052
00053 if (thread->free_con_count > 0)
00054 {
00055 con= thread->free_con_list;
00056 GEARMAN_LIST_DEL(thread->free_con, con,)
00057 }
00058 else
00059 {
00060 con= (gearman_server_con_st *)malloc(sizeof(gearman_server_con_st));
00061 if (con == NULL)
00062 {
00063 gearman_log_error(thread->gearman, "gearman_server_con_create", "malloc");
00064 return NULL;
00065 }
00066 }
00067
00068 gearman_connection_options_t options[] = { GEARMAN_CON_IGNORE_LOST_CONNECTION, GEARMAN_CON_MAX };
00069 if (gearman_connection_create(thread->gearman, &(con->con), options) == NULL)
00070 {
00071 free(con);
00072 return NULL;
00073 }
00074
00075 con->is_sleeping= false;
00076 con->is_exceptions= false;
00077 con->is_dead= false;
00078 con->is_noop_sent= false;
00079
00080 con->ret= 0;
00081 con->io_list= false;
00082 con->proc_list= false;
00083 con->proc_removed= false;
00084 con->io_packet_count= 0;
00085 con->proc_packet_count= 0;
00086 con->worker_count= 0;
00087 con->client_count= 0;
00088 con->thread= thread;
00089 con->packet= NULL;
00090 con->io_packet_list= NULL;
00091 con->io_packet_end= NULL;
00092 con->proc_packet_list= NULL;
00093 con->proc_packet_end= NULL;
00094 con->io_next= NULL;
00095 con->io_prev= NULL;
00096 con->proc_next= NULL;
00097 con->proc_prev= NULL;
00098 con->worker_list= NULL;
00099 con->client_list= NULL;
00100 con->host= NULL;
00101 con->port= NULL;
00102 strcpy(con->id, "-");
00103
00104 (void) pthread_mutex_lock(&thread->lock);
00105 GEARMAN_LIST_ADD(thread->con, con,)
00106 (void) pthread_mutex_unlock(&thread->lock);
00107
00108 return con;
00109 }
00110
00111 void gearman_server_con_free(gearman_server_con_st *con)
00112 {
00113 gearman_server_thread_st *thread= con->thread;
00114 gearman_server_packet_st *packet;
00115
00116 con->host= NULL;
00117 con->port= NULL;
00118
00119 if (thread->server->flags.threaded &&
00120 !(con->proc_removed) && !(thread->server->proc_shutdown))
00121 {
00122 con->is_dead= true;
00123 con->is_sleeping= false;
00124 con->is_exceptions= false;
00125 con->is_noop_sent= false;
00126 gearman_server_con_proc_add(con);
00127 return;
00128 }
00129
00130 gearman_connection_free(&(con->con));
00131
00132 if (con->proc_list)
00133 gearman_server_con_proc_remove(con);
00134
00135 if (con->io_list)
00136 gearman_server_con_io_remove(con);
00137
00138 if (con->packet != NULL)
00139 {
00140 if (&(con->packet->packet) != con->con.recv_packet)
00141 gearman_packet_free(&(con->packet->packet));
00142 gearman_server_packet_free(con->packet, con->thread, true);
00143 }
00144
00145 while (con->io_packet_list != NULL)
00146 gearman_server_io_packet_remove(con);
00147
00148 while (con->proc_packet_list != NULL)
00149 {
00150 packet= gearman_server_proc_packet_remove(con);
00151 gearman_packet_free(&(packet->packet));
00152 gearman_server_packet_free(packet, con->thread, true);
00153 }
00154
00155 gearman_server_con_free_workers(con);
00156
00157 while (con->client_list != NULL)
00158 gearman_server_client_free(con->client_list);
00159
00160 (void) pthread_mutex_lock(&thread->lock);
00161 GEARMAN_LIST_DEL(con->thread->con, con,)
00162 (void) pthread_mutex_unlock(&thread->lock);
00163
00164 if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
00165 GEARMAN_LIST_ADD(thread->free_con, con,)
00166 else
00167 free(con);
00168 }
00169
00170 gearman_connection_st *gearman_server_con_con(gearman_server_con_st *con)
00171 {
00172 return &con->con;
00173 }
00174
00175 const void *gearman_server_con_data(const gearman_server_con_st *con)
00176 {
00177 return gearman_connection_context(&(con->con));
00178 }
00179
00180 void gearman_server_con_set_data(gearman_server_con_st *con, void *data)
00181 {
00182 gearman_connection_set_context(&(con->con), data);
00183 }
00184
00185 const char *gearman_server_con_host(gearman_server_con_st *con)
00186 {
00187 return con->host;
00188 }
00189
00190 void gearman_server_con_set_host(gearman_server_con_st *con, const char *host)
00191 {
00192 con->host= host;
00193 }
00194
00195 const char *gearman_server_con_port(gearman_server_con_st *con)
00196 {
00197 return con->port;
00198 }
00199
00200 void gearman_server_con_set_port(gearman_server_con_st *con, const char *port)
00201 {
00202 con->port= port;
00203 }
00204
00205 const char *gearman_server_con_id(gearman_server_con_st *con)
00206 {
00207 return con->id;
00208 }
00209
00210 void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
00211 size_t size)
00212 {
00213 if (size >= GEARMAN_SERVER_CON_ID_SIZE)
00214 size= GEARMAN_SERVER_CON_ID_SIZE - 1;
00215
00216 memcpy(con->id, id, size);
00217 con->id[size]= 0;
00218 }
00219
00220 void gearman_server_con_free_worker(gearman_server_con_st *con,
00221 char *function_name,
00222 size_t function_name_size)
00223 {
00224 gearman_server_worker_st *worker= con->worker_list;
00225 gearman_server_worker_st *prev_worker= NULL;
00226
00227 while (worker != NULL)
00228 {
00229 if (worker->function->function_name_size == function_name_size &&
00230 !memcmp(worker->function->function_name, function_name,
00231 function_name_size))
00232 {
00233 gearman_server_worker_free(worker);
00234
00235
00236 if (prev_worker == NULL)
00237 worker= con->worker_list;
00238 else
00239 worker= prev_worker;
00240 }
00241 else
00242 {
00243
00244 prev_worker= worker;
00245 worker= worker->con_next;
00246 }
00247 }
00248 }
00249
00250 void gearman_server_con_free_workers(gearman_server_con_st *con)
00251 {
00252 while (con->worker_list != NULL)
00253 gearman_server_worker_free(con->worker_list);
00254 }
00255
00256 void gearman_server_con_io_add(gearman_server_con_st *con)
00257 {
00258 if (con->io_list)
00259 return;
00260
00261 (void) pthread_mutex_lock(&con->thread->lock);
00262
00263 GEARMAN_LIST_ADD(con->thread->io, con, io_)
00264 con->io_list= true;
00265
00266
00267 if (con->thread->io_count == 1 && con->thread->run_fn)
00268 {
00269 (void) pthread_mutex_unlock(&con->thread->lock);
00270 (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
00271 }
00272 else
00273 {
00274 (void) pthread_mutex_unlock(&con->thread->lock);
00275 }
00276 }
00277
00278 void gearman_server_con_io_remove(gearman_server_con_st *con)
00279 {
00280 (void) pthread_mutex_lock(&con->thread->lock);
00281 if (con->io_list)
00282 {
00283 GEARMAN_LIST_DEL(con->thread->io, con, io_)
00284 con->io_list= false;
00285 }
00286 (void) pthread_mutex_unlock(&con->thread->lock);
00287 }
00288
00289 gearman_server_con_st *
00290 gearman_server_con_io_next(gearman_server_thread_st *thread)
00291 {
00292 gearman_server_con_st *con= thread->io_list;
00293
00294 if (con == NULL)
00295 return NULL;
00296
00297 gearman_server_con_io_remove(con);
00298
00299 return con;
00300 }
00301
00302 void gearman_server_con_proc_add(gearman_server_con_st *con)
00303 {
00304 if (con->proc_list)
00305 return;
00306
00307 (void) pthread_mutex_lock(&con->thread->lock);
00308 GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
00309 con->proc_list= true;
00310 (void) pthread_mutex_unlock(&con->thread->lock);
00311
00312 if (! (con->thread->server->proc_shutdown) &&
00313 !(con->thread->server->proc_wakeup))
00314 {
00315 (void) pthread_mutex_lock(&(con->thread->server->proc_lock));
00316 con->thread->server->proc_wakeup= true;
00317 (void) pthread_cond_signal(&(con->thread->server->proc_cond));
00318 (void) pthread_mutex_unlock(&(con->thread->server->proc_lock));
00319 }
00320 }
00321
00322 void gearman_server_con_proc_remove(gearman_server_con_st *con)
00323 {
00324 (void) pthread_mutex_lock(&con->thread->lock);
00325
00326 if (con->proc_list)
00327 {
00328 GEARMAN_LIST_DEL(con->thread->proc, con, proc_)
00329 con->proc_list= false;
00330 }
00331 (void) pthread_mutex_unlock(&con->thread->lock);
00332 }
00333
00334 gearman_server_con_st *
00335 gearman_server_con_proc_next(gearman_server_thread_st *thread)
00336 {
00337 gearman_server_con_st *con;
00338
00339 if (thread->proc_list == NULL)
00340 return NULL;
00341
00342 (void) pthread_mutex_lock(&thread->lock);
00343
00344 con= thread->proc_list;
00345 while (con != NULL)
00346 {
00347 GEARMAN_LIST_DEL(thread->proc, con, proc_)
00348 con->proc_list= false;
00349 if (!(con->proc_removed))
00350 break;
00351 con= thread->proc_list;
00352 }
00353
00354 (void) pthread_mutex_unlock(&thread->lock);
00355
00356 return con;
00357 }