00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00020 gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread,
00021 int fd, void *data)
00022 {
00023 gearman_server_con_st *con;
00024 gearman_return_t ret;
00025
00026 con= gearman_server_con_create(thread);
00027 if (con == NULL)
00028 return NULL;
00029
00030 if (gearman_con_set_fd(&(con->con), fd) != GEARMAN_SUCCESS)
00031 {
00032 gearman_server_con_free(con);
00033 return NULL;
00034 }
00035
00036 con->con.context= data;
00037
00038 ret= gearman_con_set_events(&(con->con), POLLIN);
00039 if (ret != GEARMAN_SUCCESS)
00040 {
00041 gearman_server_con_free(con);
00042 return NULL;
00043 }
00044
00045 return con;
00046 }
00047
00048 gearman_server_con_st *
00049 gearman_server_con_create(gearman_server_thread_st *thread)
00050 {
00051 gearman_server_con_st *con;
00052
00053 if (thread->free_con_count > 0)
00054 {
00055 con= thread->free_con_list;
00056 GEARMAN_LIST_DEL(thread->free_con, con,)
00057 }
00058 else
00059 {
00060 con= malloc(sizeof(gearman_server_con_st));
00061 if (con == NULL)
00062 {
00063 GEARMAN_ERROR_SET(thread->gearman, "gearman_server_con_create",
00064 "malloc")
00065 return NULL;
00066 }
00067 }
00068
00069 if (gearman_con_create(thread->gearman, &(con->con)) == NULL)
00070 {
00071 free(con);
00072 return NULL;
00073 }
00074
00075 gearman_con_add_options(&(con->con), GEARMAN_CON_IGNORE_LOST_CONNECTION);
00076
00077 con->options= 0;
00078 con->ret= 0;
00079 con->io_list= false;
00080 con->proc_list= false;
00081 con->proc_removed= false;
00082 con->io_packet_count= 0;
00083 con->proc_packet_count= 0;
00084 con->worker_count= 0;
00085 con->client_count= 0;
00086 con->thread= thread;
00087 con->packet= NULL;
00088 con->io_packet_list= NULL;
00089 con->io_packet_end= NULL;
00090 con->proc_packet_list= NULL;
00091 con->proc_packet_end= NULL;
00092 con->io_next= NULL;
00093 con->io_prev= NULL;
00094 con->proc_next= NULL;
00095 con->proc_prev= NULL;
00096 con->worker_list= NULL;
00097 con->client_list= NULL;
00098 con->host= NULL;
00099 con->port= NULL;
00100 strcpy(con->id, "-");
00101
00102 GEARMAN_SERVER_THREAD_LOCK(thread)
00103 GEARMAN_LIST_ADD(thread->con, con,)
00104 GEARMAN_SERVER_THREAD_UNLOCK(thread)
00105
00106 return con;
00107 }
00108
00109 void gearman_server_con_free(gearman_server_con_st *con)
00110 {
00111 gearman_server_thread_st *thread= con->thread;
00112 gearman_server_packet_st *packet;
00113
00114 con->host= NULL;
00115 con->port= NULL;
00116
00117 if (thread->server->options & GEARMAN_SERVER_PROC_THREAD &&
00118 !(con->proc_removed) && !(thread->server->proc_shutdown))
00119 {
00120 con->options= GEARMAN_SERVER_CON_DEAD;
00121 gearman_server_con_proc_add(con);
00122 return;
00123 }
00124
00125 gearman_con_free(&(con->con));
00126
00127 if (con->proc_list)
00128 gearman_server_con_proc_remove(con);
00129
00130 if (con->io_list)
00131 gearman_server_con_io_remove(con);
00132
00133 if (con->packet != NULL)
00134 {
00135 if (&(con->packet->packet) != con->con.recv_packet)
00136 gearman_packet_free(&(con->packet->packet));
00137 gearman_server_packet_free(con->packet, con->thread, true);
00138 }
00139
00140 while (con->io_packet_list != NULL)
00141 gearman_server_io_packet_remove(con);
00142
00143 while (con->proc_packet_list != NULL)
00144 {
00145 packet= gearman_server_proc_packet_remove(con);
00146 gearman_packet_free(&(packet->packet));
00147 gearman_server_packet_free(packet, con->thread, true);
00148 }
00149
00150 gearman_server_con_free_workers(con);
00151
00152 while (con->client_list != NULL)
00153 gearman_server_client_free(con->client_list);
00154
00155 GEARMAN_SERVER_THREAD_LOCK(thread)
00156 GEARMAN_LIST_DEL(con->thread->con, con,)
00157 GEARMAN_SERVER_THREAD_UNLOCK(thread)
00158
00159 if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
00160 GEARMAN_LIST_ADD(thread->free_con, con,)
00161 else
00162 free(con);
00163 }
00164
00165 gearman_con_st *gearman_server_con_con(gearman_server_con_st *con)
00166 {
00167 return &con->con;
00168 }
00169
00170 void *gearman_server_con_data(gearman_server_con_st *con)
00171 {
00172 return gearman_con_context(&(con->con));
00173 }
00174
00175 void gearman_server_con_set_data(gearman_server_con_st *con, void *data)
00176 {
00177 gearman_con_set_context(&(con->con), data);
00178 }
00179
00180 const char *gearman_server_con_host(gearman_server_con_st *con)
00181 {
00182 return con->host;
00183 }
00184
00185 void gearman_server_con_set_host(gearman_server_con_st *con, const char *host)
00186 {
00187 con->host= host;
00188 }
00189
00190 const char *gearman_server_con_port(gearman_server_con_st *con)
00191 {
00192 return con->port;
00193 }
00194
00195 void gearman_server_con_set_port(gearman_server_con_st *con, const char *port)
00196 {
00197 con->port= port;
00198 }
00199
00200 const char *gearman_server_con_id(gearman_server_con_st *con)
00201 {
00202 return con->id;
00203 }
00204
00205 void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
00206 size_t size)
00207 {
00208 if (size >= GEARMAN_SERVER_CON_ID_SIZE)
00209 size= GEARMAN_SERVER_CON_ID_SIZE - 1;
00210
00211 memcpy(con->id, id, size);
00212 con->id[size]= 0;
00213 }
00214
00215 void gearman_server_con_free_worker(gearman_server_con_st *con,
00216 char *function_name,
00217 size_t function_name_size)
00218 {
00219 gearman_server_worker_st *worker= con->worker_list;
00220 gearman_server_worker_st *prev_worker= NULL;
00221
00222 while (worker != NULL)
00223 {
00224 if (worker->function->function_name_size == function_name_size &&
00225 !memcmp(worker->function->function_name, function_name,
00226 function_name_size))
00227 {
00228 gearman_server_worker_free(worker);
00229
00230
00231 if (prev_worker == NULL)
00232 worker= con->worker_list;
00233 else
00234 worker= prev_worker;
00235 }
00236 else
00237 {
00238
00239 prev_worker= worker;
00240 worker= worker->con_next;
00241 }
00242 }
00243 }
00244
00245 void gearman_server_con_free_workers(gearman_server_con_st *con)
00246 {
00247 while (con->worker_list != NULL)
00248 gearman_server_worker_free(con->worker_list);
00249 }
00250
00251 void gearman_server_con_io_add(gearman_server_con_st *con)
00252 {
00253 if (con->io_list)
00254 return;
00255
00256 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00257
00258 GEARMAN_LIST_ADD(con->thread->io, con, io_)
00259 con->io_list= true;
00260
00261
00262 if (con->thread->io_count == 1 && con->thread->run_fn)
00263 {
00264 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00265 (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
00266 }
00267 else
00268 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00269 }
00270
00271 void gearman_server_con_io_remove(gearman_server_con_st *con)
00272 {
00273 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00274 if (con->io_list)
00275 {
00276 GEARMAN_LIST_DEL(con->thread->io, con, io_)
00277 con->io_list= false;
00278 }
00279 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00280 }
00281
00282 gearman_server_con_st *
00283 gearman_server_con_io_next(gearman_server_thread_st *thread)
00284 {
00285 gearman_server_con_st *con= thread->io_list;
00286
00287 if (con == NULL)
00288 return NULL;
00289
00290 gearman_server_con_io_remove(con);
00291
00292 return con;
00293 }
00294
00295 void gearman_server_con_proc_add(gearman_server_con_st *con)
00296 {
00297 if (con->proc_list)
00298 return;
00299
00300 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00301 GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
00302 con->proc_list= true;
00303 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00304
00305 if (!(con->thread->server->proc_shutdown) &&
00306 !(con->thread->server->proc_wakeup))
00307 {
00308 (void) pthread_mutex_lock(&(con->thread->server->proc_lock));
00309 con->thread->server->proc_wakeup= true;
00310 (void) pthread_cond_signal(&(con->thread->server->proc_cond));
00311 (void) pthread_mutex_unlock(&(con->thread->server->proc_lock));
00312 }
00313 }
00314
00315 void gearman_server_con_proc_remove(gearman_server_con_st *con)
00316 {
00317 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00318 if (con->proc_list)
00319 {
00320 GEARMAN_LIST_DEL(con->thread->proc, con, proc_)
00321 con->proc_list= false;
00322 }
00323 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00324 }
00325
00326 gearman_server_con_st *
00327 gearman_server_con_proc_next(gearman_server_thread_st *thread)
00328 {
00329 gearman_server_con_st *con;
00330
00331 if (thread->proc_list == NULL)
00332 return NULL;
00333
00334 GEARMAN_SERVER_THREAD_LOCK(thread)
00335
00336 con= thread->proc_list;
00337 while (con != NULL)
00338 {
00339 GEARMAN_LIST_DEL(thread->proc, con, proc_)
00340 con->proc_list= false;
00341 if (!(con->proc_removed))
00342 break;
00343 con= thread->proc_list;
00344 }
00345
00346 GEARMAN_SERVER_THREAD_UNLOCK(thread)
00347
00348 return con;
00349 }