00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 gearman_return_t _thread_packet_read(gearman_server_con_st *con);
00030
00034 static gearman_return_t _thread_packet_flush(gearman_server_con_st *con);
00035
00039 static gearman_return_t _proc_thread_start(gearman_server_st *server);
00040
00044 static void _proc_thread_kill(gearman_server_st *server);
00045
00049 static void *_proc(void *data);
00050
00054 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00055
00058
00059
00060
00061
00062 gearman_server_thread_st *
00063 gearman_server_thread_create(gearman_server_st *server,
00064 gearman_server_thread_st *thread)
00065 {
00066 if (server->thread_count == 1)
00067 {
00068
00069 if (_proc_thread_start(server) != GEARMAN_SUCCESS)
00070 return NULL;
00071 }
00072
00073 if (thread == NULL)
00074 {
00075 thread= (gearman_server_thread_st *)malloc(sizeof(gearman_server_thread_st));
00076 if (thread == NULL)
00077 {
00078 _proc_thread_kill(server);
00079 return NULL;
00080 }
00081
00082 thread->options.allocated= true;
00083 }
00084 else
00085 {
00086 thread->options.allocated= false;
00087 }
00088
00089 thread->con_count= 0;
00090 thread->io_count= 0;
00091 thread->proc_count= 0;
00092 thread->free_con_count= 0;
00093 thread->free_packet_count= 0;
00094 thread->server= server;
00095 thread->log_fn= NULL;
00096 thread->log_context= NULL;
00097 thread->run_fn= NULL;
00098 thread->run_fn_arg= NULL;
00099 thread->con_list= NULL;
00100 thread->io_list= NULL;
00101 thread->proc_list= NULL;
00102 thread->free_con_list= NULL;
00103 thread->free_packet_list= NULL;
00104
00105 if (pthread_mutex_init(&(thread->lock), NULL) != 0)
00106 {
00107 if (thread->options.allocated)
00108 free(thread);
00109
00110 return NULL;
00111 }
00112
00113 GEARMAN_LIST_ADD(server->thread, thread,);
00114
00115 gearman_universal_options_t options[]= { GEARMAN_NON_BLOCKING, GEARMAN_DONT_TRACK_PACKETS, GEARMAN_MAX};
00116 thread->gearman= gearman_universal_create(&(thread->gearman_universal_static), options);
00117 if (thread->gearman == NULL)
00118 {
00119 gearman_server_thread_free(thread);
00120 return NULL;
00121 }
00122
00123 return thread;
00124 }
00125
00126 void gearman_server_thread_free(gearman_server_thread_st *thread)
00127 {
00128 gearman_server_con_st *con;
00129 gearman_server_packet_st *packet;
00130
00131 _proc_thread_kill(thread->server);
00132
00133 while (thread->con_list != NULL)
00134 gearman_server_con_free(thread->con_list);
00135
00136 while (thread->free_con_list != NULL)
00137 {
00138 con= thread->free_con_list;
00139 thread->free_con_list= con->next;
00140 free(con);
00141 }
00142
00143 while (thread->free_packet_list != NULL)
00144 {
00145 packet= thread->free_packet_list;
00146 thread->free_packet_list= packet->next;
00147 free(packet);
00148 }
00149
00150 if (thread->gearman != NULL)
00151 gearman_universal_free(thread->gearman);
00152
00153 pthread_mutex_destroy(&(thread->lock));
00154
00155 GEARMAN_LIST_DEL(thread->server->thread, thread,)
00156
00157 if (thread->options.allocated)
00158 free(thread);
00159 }
00160
00161 const char *gearman_server_thread_error(gearman_server_thread_st *thread)
00162 {
00163 return gearman_universal_error(thread->gearman);
00164 }
00165
00166 int gearman_server_thread_errno(gearman_server_thread_st *thread)
00167 {
00168 return gearman_universal_errno(thread->gearman);
00169 }
00170
00171 void gearman_server_thread_set_event_watch(gearman_server_thread_st *thread,
00172 gearman_event_watch_fn *event_watch,
00173 void *event_watch_arg)
00174 {
00175 gearman_set_event_watch_fn(thread->gearman, event_watch, event_watch_arg);
00176 }
00177
00178 void gearman_server_thread_set_run(gearman_server_thread_st *thread,
00179 gearman_server_thread_run_fn *run_fn,
00180 void *run_fn_arg)
00181 {
00182 thread->run_fn= run_fn;
00183 thread->run_fn_arg= run_fn_arg;
00184 }
00185
00186 void gearman_server_thread_set_log_fn(gearman_server_thread_st *thread,
00187 gearman_log_fn *function,
00188 void *context,
00189 gearman_verbose_t verbose)
00190 {
00191 thread->log_fn= function;
00192 thread->log_context= context;
00193 gearman_set_log_fn(thread->gearman, _log, thread, verbose);
00194 }
00195
00196 gearman_server_con_st *
00197 gearman_server_thread_run(gearman_server_thread_st *thread,
00198 gearman_return_t *ret_ptr)
00199 {
00200 gearman_connection_st *con;
00201 gearman_server_con_st *server_con;
00202
00203
00204
00205 if (thread->server->flags.threaded)
00206 {
00207 while ((server_con= gearman_server_con_io_next(thread)) != NULL)
00208 {
00209 if (server_con->is_dead)
00210 {
00211 if (server_con->proc_removed)
00212 gearman_server_con_free(server_con);
00213
00214 continue;
00215 }
00216
00217 if (server_con->ret != GEARMAN_SUCCESS)
00218 {
00219 *ret_ptr= server_con->ret;
00220 return server_con;
00221 }
00222
00223
00224 *ret_ptr= _thread_packet_flush(server_con);
00225 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00226 return server_con;
00227 }
00228 }
00229
00230
00231 while ((con= gearman_ready(thread->gearman)) != NULL)
00232 {
00233
00234
00235
00236 server_con= (gearman_server_con_st *)con;
00237
00238
00239 if (con->revents & POLLIN)
00240 {
00241 *ret_ptr= _thread_packet_read(server_con);
00242 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00243 return server_con;
00244 }
00245
00246
00247 if (con->revents & POLLOUT)
00248 {
00249 *ret_ptr= _thread_packet_flush(server_con);
00250 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00251 return server_con;
00252 }
00253 }
00254
00255
00256 if (! (thread->server->flags.threaded))
00257 {
00258 while ((server_con= gearman_server_con_io_next(thread)) != NULL)
00259 {
00260 *ret_ptr= _thread_packet_flush(server_con);
00261 if (*ret_ptr != GEARMAN_SUCCESS && *ret_ptr != GEARMAN_IO_WAIT)
00262 return server_con;
00263 }
00264 }
00265
00266
00267 if (thread->server->shutdown)
00268 *ret_ptr= GEARMAN_SHUTDOWN;
00269 else if (thread->server->shutdown_graceful)
00270 {
00271 if (thread->server->job_count == 0)
00272 *ret_ptr= GEARMAN_SHUTDOWN;
00273 else
00274 *ret_ptr= GEARMAN_SHUTDOWN_GRACEFUL;
00275 }
00276 else
00277 *ret_ptr= GEARMAN_SUCCESS;
00278
00279 return NULL;
00280 }
00281
00282
00283
00284
00285
00286 gearman_return_t _thread_packet_read(gearman_server_con_st *con)
00287 {
00288 gearman_return_t ret;
00289
00290 while (1)
00291 {
00292 if (con->packet == NULL)
00293 {
00294 con->packet= gearman_server_packet_create(con->thread, true);
00295 if (con->packet == NULL)
00296 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00297 }
00298
00299 (void)gearman_connection_recv(&(con->con), &(con->packet->packet), &ret, true);
00300 if (ret != GEARMAN_SUCCESS)
00301 {
00302 if (ret == GEARMAN_IO_WAIT)
00303 break;
00304
00305 gearman_server_packet_free(con->packet, con->thread, true);
00306 con->packet= NULL;
00307 return ret;
00308 }
00309
00310 gearman_log_debug(con->thread->gearman, "%15s:%5s Received %s",
00311 con->host == NULL ? "-" : con->host,
00312 con->port == NULL ? "-" : con->port,
00313 gearman_command_info_list[con->packet->packet.command].name);
00314
00315
00316 if (con->thread->server->flags.threaded)
00317 {
00318
00319 gearman_server_proc_packet_add(con, con->packet);
00320 con->packet= NULL;
00321 }
00322 else
00323 {
00324
00325 ret= gearman_server_run_command(con, &(con->packet->packet));
00326 gearman_packet_free(&(con->packet->packet));
00327 gearman_server_packet_free(con->packet, con->thread, true);
00328 con->packet= NULL;
00329 if (ret != GEARMAN_SUCCESS)
00330 return ret;
00331 }
00332 }
00333
00334 return GEARMAN_SUCCESS;
00335 }
00336
00337 static gearman_return_t _thread_packet_flush(gearman_server_con_st *con)
00338 {
00339 gearman_return_t ret;
00340
00341
00342 if (con->con.events & POLLOUT)
00343 return GEARMAN_IO_WAIT;
00344
00345 while (con->io_packet_list != NULL)
00346 {
00347 ret= gearman_connection_send(&(con->con), &(con->io_packet_list->packet),
00348 con->io_packet_list->next == NULL ? true : false);
00349 if (ret != GEARMAN_SUCCESS)
00350 return ret;
00351
00352 gearman_log_debug(con->thread->gearman, "%15s:%5s Sent %s",
00353 con->host == NULL ? "-" : con->host,
00354 con->port == NULL ? "-" : con->port,
00355 gearman_command_info_list[con->io_packet_list->packet.command].name);
00356
00357 gearman_server_io_packet_remove(con);
00358 }
00359
00360
00361 return gearman_connection_set_events(&(con->con), POLLIN);
00362 }
00363
00364 static gearman_return_t _proc_thread_start(gearman_server_st *server)
00365 {
00366 pthread_attr_t attr;
00367
00368 if (pthread_mutex_init(&(server->proc_lock), NULL) != 0)
00369 return GEARMAN_PTHREAD;
00370
00371 if (pthread_cond_init(&(server->proc_cond), NULL) != 0)
00372 return GEARMAN_PTHREAD;
00373
00374 if (pthread_attr_init(&attr) != 0)
00375 return GEARMAN_PTHREAD;
00376
00377 if (pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM) != 0)
00378 return GEARMAN_PTHREAD;
00379
00380 if (pthread_create(&(server->proc_id), &attr, _proc, server) != 0)
00381 return GEARMAN_PTHREAD;
00382
00383 (void) pthread_attr_destroy(&attr);
00384
00385 server->flags.threaded= true;
00386
00387 return GEARMAN_SUCCESS;
00388 }
00389
00390 static void _proc_thread_kill(gearman_server_st *server)
00391 {
00392 if (! (server->flags.threaded) || server->proc_shutdown)
00393 return;
00394
00395 server->proc_shutdown= true;
00396
00397
00398 (void) pthread_mutex_lock(&(server->proc_lock));
00399 (void) pthread_cond_signal(&(server->proc_cond));
00400 (void) pthread_mutex_unlock(&(server->proc_lock));
00401
00402
00403 (void) pthread_join(server->proc_id, NULL);
00404 (void) pthread_cond_destroy(&(server->proc_cond));
00405 (void) pthread_mutex_destroy(&(server->proc_lock));
00406 }
00407
00408 static void *_proc(void *data)
00409 {
00410 gearman_server_st *server= (gearman_server_st *)data;
00411 gearman_server_thread_st *thread;
00412 gearman_server_con_st *con;
00413 gearman_server_packet_st *packet;
00414
00415 while (1)
00416 {
00417 (void) pthread_mutex_lock(&(server->proc_lock));
00418 while (server->proc_wakeup == false)
00419 {
00420 if (server->proc_shutdown)
00421 {
00422 (void) pthread_mutex_unlock(&(server->proc_lock));
00423 return NULL;
00424 }
00425
00426 (void) pthread_cond_wait(&(server->proc_cond), &(server->proc_lock));
00427 }
00428 server->proc_wakeup= false;
00429 (void) pthread_mutex_unlock(&(server->proc_lock));
00430
00431 for (thread= server->thread_list; thread != NULL; thread= thread->next)
00432 {
00433 while ((con= gearman_server_con_proc_next(thread)) != NULL)
00434 {
00435 if (con->is_dead)
00436 {
00437 gearman_server_con_free_workers(con);
00438
00439 while (con->client_list != NULL)
00440 gearman_server_client_free(con->client_list);
00441
00442 con->proc_removed= true;
00443 gearman_server_con_io_add(con);
00444 continue;
00445 }
00446
00447 while (1)
00448 {
00449 packet= gearman_server_proc_packet_remove(con);
00450 if (packet == NULL)
00451 break;
00452
00453 con->ret= gearman_server_run_command(con, &(packet->packet));
00454 gearman_packet_free(&(packet->packet));
00455 gearman_server_packet_free(packet, con->thread, false);
00456 }
00457 }
00458 }
00459 }
00460 }
00461
00462 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00463 {
00464 gearman_server_thread_st *thread= (gearman_server_thread_st *)context;
00465 (*(thread->log_fn))(line, verbose, (void *)thread->log_context);
00466 }