00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 gearman_return_t _thread_packet_read(gearman_server_con_st *con);
00030
00034 static gearman_return_t _thread_packet_flush(gearman_server_con_st *con);
00035
00039 static gearman_return_t _proc_thread_start(gearman_server_st *server);
00040
00044 static void _proc_thread_kill(gearman_server_st *server);
00045
00049 static void *_proc(void *data);
00050
00054 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00055
00058
00059
00060
00061
00062 gearman_server_thread_st *
00063 gearman_server_thread_create(gearman_server_st *server,
00064 gearman_server_thread_st *thread)
00065 {
00066 if (server->thread_count == 1)
00067 {
00068
00069 if (_proc_thread_start(server) != GEARMAN_SUCCESS)
00070 return NULL;
00071 }
00072
00073 if (thread == NULL)
00074 {
00075 thread= malloc(sizeof(gearman_server_thread_st));
00076 if (thread == NULL)
00077 {
00078 _proc_thread_kill(server);
00079 return NULL;
00080 }
00081
00082 thread->options= GEARMAN_SERVER_THREAD_ALLOCATED;
00083 }
00084 else
00085 thread->options= 0;
00086
00087 thread->con_count= 0;
00088 thread->io_count= 0;
00089 thread->proc_count= 0;
00090 thread->free_con_count= 0;
00091 thread->free_packet_count= 0;
00092 thread->server= server;
00093 thread->log_fn= NULL;
00094 thread->log_context= NULL;
00095 thread->run_fn= NULL;
00096 thread->run_fn_arg= NULL;
00097 thread->con_list= NULL;
00098 thread->io_list= NULL;
00099 thread->proc_list= NULL;
00100 thread->free_con_list= NULL;
00101 thread->free_packet_list= NULL;
00102
00103 if (pthread_mutex_init(&(thread->lock), NULL) != 0)
00104 {
00105 if (thread->options & GEARMAN_SERVER_THREAD_ALLOCATED)
00106 free(thread);
00107 return NULL;
00108 }
00109
00110 GEARMAN_LIST_ADD(server->thread, thread,)
00111
00112 thread->gearman= gearman_create(&(thread->gearman_static));
00113 if (thread->gearman == NULL)
00114 {
00115 gearman_server_thread_free(thread);
00116 return NULL;
00117 }
00118
00119 gearman_set_options(thread->gearman,
00120 GEARMAN_NON_BLOCKING | GEARMAN_DONT_TRACK_PACKETS);
00121
00122 return thread;
00123 }
00124
00125 void gearman_server_thread_free(gearman_server_thread_st *thread)
00126 {
00127 gearman_server_con_st *con;
00128 gearman_server_packet_st *packet;
00129
00130 _proc_thread_kill(thread->server);
00131
00132 while (thread->con_list != NULL)
00133 gearman_server_con_free(thread->con_list);
00134
00135 while (thread->free_con_list != NULL)
00136 {
00137 con= thread->free_con_list;
00138 thread->free_con_list= con->next;
00139 free(con);
00140 }
00141
00142 while (thread->free_packet_list != NULL)
00143 {
00144 packet= thread->free_packet_list;
00145 thread->free_packet_list= packet->next;
00146 free(packet);
00147 }
00148
00149 if (thread->gearman != NULL)
00150 gearman_free(thread->gearman);
00151
00152 pthread_mutex_destroy(&(thread->lock));
00153
00154 GEARMAN_LIST_DEL(thread->server->thread, thread,)
00155
00156 if (thread->options & GEARMAN_SERVER_THREAD_ALLOCATED)
00157 free(thread);
00158 }
00159
00160 const char *gearman_server_thread_error(gearman_server_thread_st *thread)
00161 {
00162 return gearman_error(thread->gearman);
00163 }
00164
00165 int gearman_server_thread_errno(gearman_server_thread_st *thread)
00166 {
00167 return gearman_errno(thread->gearman);
00168 }
00169
00170 void gearman_server_thread_set_event_watch(gearman_server_thread_st *thread,
00171 gearman_event_watch_fn *event_watch,
00172 void *event_watch_arg)
00173 {
00174 gearman_set_event_watch_fn(thread->gearman, event_watch, event_watch_arg);
00175 }
00176
00177 void gearman_server_thread_set_run(gearman_server_thread_st *thread,
00178 gearman_server_thread_run_fn *run_fn,
00179 void *run_fn_arg)
00180 {
00181 thread->run_fn= run_fn;
00182 thread->run_fn_arg= run_fn_arg;
00183 }
00184
00185 void gearman_server_thread_set_log_fn(gearman_server_thread_st *thread,
00186 gearman_log_fn *function,
00187 const void *context,
00188 gearman_verbose_t verbose)
00189 {
00190 thread->log_fn= function;
00191 thread->log_context= context;
00192 gearman_set_log_fn(thread->gearman, _log, thread, verbose);
00193 }
00194
00195 gearman_server_con_st *
00196 gearman_server_thread_run(gearman_server_thread_st *thread,
00197 gearman_return_t *ret_ptr)
00198 {
00199 gearman_con_st *con;
00200 gearman_server_con_st *server_con;
00201
00202
00203
00204 if (thread->server->options & GEARMAN_SERVER_PROC_THREAD)
00205 {
00206 while ((server_con= gearman_server_con_io_next(thread)) != NULL)
00207 {
00208 if (server_con->options & GEARMAN_SERVER_CON_DEAD)
00209 {
00210 if (server_con->proc_removed)
00211 gearman_server_con_free(server_con);
00212
00213 continue;
00214 }
00215
00216 if (server_con->ret != GEARMAN_SUCCESS)
00217 {
00218 *ret_ptr= server_con->ret;
00219 return server_con;
00220 }
00221
00222
00223 *ret_ptr= _thread_packet_flush(server_con);
00224 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00225 return server_con;
00226 }
00227 }
00228
00229
00230 while ((con= gearman_con_ready(thread->gearman)) != NULL)
00231 {
00232
00233
00234
00235 server_con= (gearman_server_con_st *)con;
00236
00237
00238 if (con->revents & POLLIN)
00239 {
00240 *ret_ptr= _thread_packet_read(server_con);
00241 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00242 return server_con;
00243 }
00244
00245
00246 if (con->revents & POLLOUT)
00247 {
00248 *ret_ptr= _thread_packet_flush(server_con);
00249 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00250 return server_con;
00251 }
00252 }
00253
00254
00255 if (!(thread->server->options & GEARMAN_SERVER_PROC_THREAD))
00256 {
00257 while ((server_con= gearman_server_con_io_next(thread)) != NULL)
00258 {
00259 *ret_ptr= _thread_packet_flush(server_con);
00260 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00261 return server_con;
00262 }
00263 }
00264
00265
00266 if (thread->server->shutdown)
00267 *ret_ptr= GEARMAN_SHUTDOWN;
00268 else if (thread->server->shutdown_graceful)
00269 {
00270 if (thread->server->job_count == 0)
00271 *ret_ptr= GEARMAN_SHUTDOWN;
00272 else
00273 *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
00274 }
00275 else
00276 *ret_ptr= GEARMAN_SUCCESS;
00277
00278 return NULL;
00279 }
00280
00281
00282
00283
00284
00285 gearman_return_t _thread_packet_read(gearman_server_con_st *con)
00286 {
00287 gearman_return_t ret;
00288
00289 while (1)
00290 {
00291 if (con->packet == NULL)
00292 {
00293 con->packet= gearman_server_packet_create(con->thread, true);
00294 if (con->packet == NULL)
00295 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00296 }
00297
00298 (void)gearman_con_recv(&(con->con), &(con->packet->packet), &ret, true);
00299 if (ret != GEARMAN_SUCCESS)
00300 {
00301 if (ret == GEARMAN_IO_WAIT)
00302 break;
00303
00304 gearman_server_packet_free(con->packet, con->thread, true);
00305 con->packet= NULL;
00306 return ret;
00307 }
00308
00309 GEARMAN_DEBUG(con->thread->gearman, "%15s:%5s Received %s",
00310 con->host == NULL ? "-" : con->host,
00311 con->port == NULL ? "-" : con->port,
00312 gearman_command_info_list[con->packet->packet.command].name)
00313
00314
00315 if (con->thread->server->options & GEARMAN_SERVER_PROC_THREAD)
00316 {
00317
00318 gearman_server_proc_packet_add(con, con->packet);
00319 con->packet= NULL;
00320 }
00321 else
00322 {
00323
00324 ret= gearman_server_run_command(con, &(con->packet->packet));
00325 gearman_packet_free(&(con->packet->packet));
00326 gearman_server_packet_free(con->packet, con->thread, true);
00327 con->packet= NULL;
00328 if (ret != GEARMAN_SUCCESS)
00329 return ret;
00330 }
00331 }
00332
00333 return GEARMAN_SUCCESS;
00334 }
00335
00336 static gearman_return_t _thread_packet_flush(gearman_server_con_st *con)
00337 {
00338 gearman_return_t ret;
00339
00340
00341 if (con->con.events & POLLOUT)
00342 return GEARMAN_IO_WAIT;
00343
00344 while (con->io_packet_list != NULL)
00345 {
00346 ret= gearman_con_send(&(con->con), &(con->io_packet_list->packet),
00347 con->io_packet_list->next == NULL ? true : false);
00348 if (ret != GEARMAN_SUCCESS)
00349 return ret;
00350
00351 GEARMAN_DEBUG(con->thread->gearman, "%15s:%5s Sent %s",
00352 con->host == NULL ? "-" : con->host,
00353 con->port == NULL ? "-" : con->port,
00354 gearman_command_info_list[con->io_packet_list->packet.command].name)
00355
00356 gearman_server_io_packet_remove(con);
00357 }
00358
00359
00360 return gearman_con_set_events(&(con->con), POLLIN);
00361 }
00362
00363 static gearman_return_t _proc_thread_start(gearman_server_st *server)
00364 {
00365 pthread_attr_t attr;
00366
00367 if (pthread_mutex_init(&(server->proc_lock), NULL) != 0)
00368 return GEARMAN_PTHREAD;
00369
00370 if (pthread_cond_init(&(server->proc_cond), NULL) != 0)
00371 return GEARMAN_PTHREAD;
00372
00373 if (pthread_attr_init(&attr) != 0)
00374 return GEARMAN_PTHREAD;
00375
00376 if (pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0)
00377 return GEARMAN_PTHREAD;
00378
00379 if (pthread_create(&(server->proc_id), &attr, _proc, server) != 0)
00380 return GEARMAN_PTHREAD;
00381
00382 (void) pthread_attr_destroy(&attr);
00383
00384 server->options|= GEARMAN_SERVER_PROC_THREAD;
00385
00386 return GEARMAN_SUCCESS;
00387 }
00388
00389 static void _proc_thread_kill(gearman_server_st *server)
00390 {
00391 if (!(server->options & GEARMAN_SERVER_PROC_THREAD) || server->proc_shutdown)
00392 return;
00393
00394 server->proc_shutdown= true;
00395
00396
00397 (void) pthread_mutex_lock(&(server->proc_lock));
00398 (void) pthread_cond_signal(&(server->proc_cond));
00399 (void) pthread_mutex_unlock(&(server->proc_lock));
00400
00401
00402 (void) pthread_join(server->proc_id, NULL);
00403 (void) pthread_cond_destroy(&(server->proc_cond));
00404 (void) pthread_mutex_destroy(&(server->proc_lock));
00405 }
00406
00407 static void *_proc(void *data)
00408 {
00409 gearman_server_st *server= (gearman_server_st *)data;
00410 gearman_server_thread_st *thread;
00411 gearman_server_con_st *con;
00412 gearman_server_packet_st *packet;
00413
00414 while (1)
00415 {
00416 (void) pthread_mutex_lock(&(server->proc_lock));
00417 while (server->proc_wakeup == false)
00418 {
00419 if (server->proc_shutdown)
00420 {
00421 (void) pthread_mutex_unlock(&(server->proc_lock));
00422 return NULL;
00423 }
00424
00425 (void) pthread_cond_wait(&(server->proc_cond), &(server->proc_lock));
00426 }
00427 server->proc_wakeup= false;
00428 (void) pthread_mutex_unlock(&(server->proc_lock));
00429
00430 for (thread= server->thread_list; thread != NULL; thread= thread->next)
00431 {
00432 while ((con= gearman_server_con_proc_next(thread)) != NULL)
00433 {
00434 if (con->options & GEARMAN_SERVER_CON_DEAD)
00435 {
00436 gearman_server_con_free_workers(con);
00437
00438 while (con->client_list != NULL)
00439 gearman_server_client_free(con->client_list);
00440
00441 con->proc_removed= true;
00442 gearman_server_con_io_add(con);
00443 continue;
00444 }
00445
00446 while (1)
00447 {
00448 packet= gearman_server_proc_packet_remove(con);
00449 if (packet == NULL)
00450 break;
00451
00452 con->ret= gearman_server_run_command(con, &(packet->packet));
00453 gearman_packet_free(&(packet->packet));
00454 gearman_server_packet_free(packet, con->thread, false);
00455 }
00456 }
00457 }
00458 }
00459 }
00460
00461 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00462 {
00463 gearman_server_thread_st *thread= (gearman_server_thread_st *)context;
00464 (*(thread->log_fn))(line, verbose, (void *)thread->log_context);
00465 }