00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00028
00029 static gearman_return_t _listen_init(gearmand_st *gearmand);
00030 static void _listen_close(gearmand_st *gearmand);
00031 static gearman_return_t _listen_watch(gearmand_st *gearmand);
00032 static void _listen_clear(gearmand_st *gearmand);
00033 static void _listen_event(int fd, short events, void *arg);
00034
00035 static gearman_return_t _wakeup_init(gearmand_st *gearmand);
00036 static void _wakeup_close(gearmand_st *gearmand);
00037 static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
00038 static void _wakeup_clear(gearmand_st *gearmand);
00039 static void _wakeup_event(int fd, short events, void *arg);
00040
00041 static gearman_return_t _watch_events(gearmand_st *gearmand);
00042 static void _clear_events(gearmand_st *gearmand);
00043 static void _close_events(gearmand_st *gearmand);
00044
00047
00048
00049
00050
00051 gearmand_st *gearmand_create(const char *host, in_port_t port)
00052 {
00053 gearmand_st *gearmand;
00054
00055 gearmand= (gearmand_st *)malloc(sizeof(gearmand_st));
00056 if (gearmand == NULL)
00057 return NULL;
00058
00059 if (gearman_server_create(&(gearmand->server)) == NULL)
00060 {
00061 free(gearmand);
00062 return NULL;
00063 }
00064
00065 gearmand->is_listen_event= false;
00066 gearmand->is_wakeup_event= false;
00067 gearmand->verbose= 0;
00068 gearmand->ret= 0;
00069 gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
00070 gearmand->threads= 0;
00071 gearmand->port_count= 0;
00072 gearmand->thread_count= 0;
00073 gearmand->free_dcon_count= 0;
00074 gearmand->max_thread_free_dcon_count= 0;
00075 gearmand->wakeup_fd[0]= -1;
00076 gearmand->wakeup_fd[1]= -1;
00077 gearmand->host= host;
00078 gearmand->log_fn= NULL;
00079 gearmand->log_context= NULL;
00080 gearmand->base= NULL;
00081 gearmand->port_list= NULL;
00082 gearmand->thread_list= NULL;
00083 gearmand->thread_add_next= NULL;
00084 gearmand->free_dcon_list= NULL;
00085
00086 if (port == 0)
00087 port= GEARMAN_DEFAULT_TCP_PORT;
00088
00089 if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
00090 {
00091 gearmand_free(gearmand);
00092 return NULL;
00093 }
00094
00095 return gearmand;
00096 }
00097
00098 void gearmand_free(gearmand_st *gearmand)
00099 {
00100 gearmand_con_st *dcon;
00101 uint32_t x;
00102
00103 _close_events(gearmand);
00104
00105 if (gearmand->threads > 0)
00106 gearmand_log_info(gearmand, "Shutting down all threads");
00107
00108 while (gearmand->thread_list != NULL)
00109 gearmand_thread_free(gearmand->thread_list);
00110
00111 while (gearmand->free_dcon_list != NULL)
00112 {
00113 dcon= gearmand->free_dcon_list;
00114 gearmand->free_dcon_list= dcon->next;
00115 free(dcon);
00116 }
00117
00118 if (gearmand->base != NULL)
00119 event_base_free(gearmand->base);
00120
00121 gearman_server_free(&(gearmand->server));
00122
00123 for (x= 0; x < gearmand->port_count; x++)
00124 {
00125 if (gearmand->port_list[x].listen_fd != NULL)
00126 free(gearmand->port_list[x].listen_fd);
00127
00128 if (gearmand->port_list[x].listen_event != NULL)
00129 free(gearmand->port_list[x].listen_event);
00130 }
00131
00132 if (gearmand->port_list != NULL)
00133 free(gearmand->port_list);
00134
00135 gearmand_log_info(gearmand, "Shutdown complete");
00136
00137 free(gearmand);
00138 }
00139
00140 void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
00141 {
00142 gearmand->backlog= backlog;
00143 }
00144
00145 void gearmand_set_job_retries(gearmand_st *gearmand, uint8_t job_retries)
00146 {
00147 gearman_server_set_job_retries(&(gearmand->server), job_retries);
00148 }
00149
00150 void gearmand_set_worker_wakeup(gearmand_st *gearmand, uint8_t worker_wakeup)
00151 {
00152 gearman_server_set_worker_wakeup(&(gearmand->server), worker_wakeup);
00153 }
00154
00155 void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
00156 {
00157 gearmand->threads= threads;
00158 }
00159
00160 void gearmand_set_log_fn(gearmand_st *gearmand, gearman_log_fn *function,
00161 void *context, gearman_verbose_t verbose)
00162 {
00163 gearman_server_set_log_fn(&(gearmand->server), _log, gearmand, verbose);
00164 gearmand->log_fn= function;
00165 gearmand->log_context= context;
00166 gearmand->verbose= verbose;
00167 }
00168
00169 gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
00170 gearman_connection_add_fn *function)
00171 {
00172 gearmand_port_st *port_list;
00173
00174 port_list= (gearmand_port_st *)realloc(gearmand->port_list,
00175 sizeof(gearmand_port_st) * (gearmand->port_count + 1));
00176 if (port_list == NULL)
00177 {
00178 gearmand_log_fatal(gearmand, "gearmand_port_add:realloc:NULL");
00179 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00180 }
00181
00182 port_list[gearmand->port_count].port= port;
00183 port_list[gearmand->port_count].listen_count= 0;
00184 port_list[gearmand->port_count].gearmand= gearmand;
00185 port_list[gearmand->port_count].add_fn= function;
00186 port_list[gearmand->port_count].listen_fd= NULL;
00187 port_list[gearmand->port_count].listen_event= NULL;
00188
00189 gearmand->port_list= port_list;
00190 gearmand->port_count++;
00191
00192 return GEARMAN_SUCCESS;
00193 }
00194
00195 gearman_return_t gearmand_run(gearmand_st *gearmand)
00196 {
00197 uint32_t x;
00198
00199
00200 if (gearmand->base == NULL)
00201 {
00202 gearmand_log_info(gearmand, "Starting up");
00203
00204 if (gearmand->threads > 0)
00205 {
00206 #ifndef HAVE_EVENT_BASE_NEW
00207 gearmand_log_fatal(gearmand, "Multi-threaded gearmand requires libevent 1.4 or later, libevent 1.3 does not provided a "
00208 "thread-safe interface.");
00209 return GEARMAN_EVENT;
00210 #else
00211
00212
00213
00214 gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
00215 gearmand->threads) / 2);
00216 #endif
00217 }
00218
00219 gearmand_log_debug(gearmand, "Initializing libevent for main thread");
00220
00221 gearmand->base= event_base_new();
00222 if (gearmand->base == NULL)
00223 {
00224 gearmand_log_fatal(gearmand, "gearmand_run:event_base_new:NULL");
00225 return GEARMAN_EVENT;
00226 }
00227
00228 gearmand_log_debug(gearmand, "Method for libevent: %s",
00229 event_base_get_method(gearmand->base));
00230
00231 gearmand->ret= _listen_init(gearmand);
00232 if (gearmand->ret != GEARMAN_SUCCESS)
00233 return gearmand->ret;
00234
00235 gearmand->ret= _wakeup_init(gearmand);
00236 if (gearmand->ret != GEARMAN_SUCCESS)
00237 return gearmand->ret;
00238
00239 gearmand_log_debug(gearmand, "Creating %u threads", gearmand->threads);
00240
00241
00242 x= 0;
00243 do
00244 {
00245 gearmand->ret= gearmand_thread_create(gearmand);
00246 if (gearmand->ret != GEARMAN_SUCCESS)
00247 return gearmand->ret;
00248 x++;
00249 }
00250 while (x < gearmand->threads);
00251
00252 gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
00253 if (gearmand->ret != GEARMAN_SUCCESS)
00254 return gearmand->ret;
00255 }
00256
00257 gearmand->ret= _watch_events(gearmand);
00258 if (gearmand->ret != GEARMAN_SUCCESS)
00259 return gearmand->ret;
00260
00261 gearmand_log_info(gearmand, "Entering main event loop");
00262
00263 if (event_base_loop(gearmand->base, 0) == -1)
00264 {
00265 gearmand_log_fatal(gearmand, "gearmand_run:event_base_loop:-1");
00266 return GEARMAN_EVENT;
00267 }
00268
00269 gearmand_log_info(gearmand, "Exited main event loop");
00270
00271 return gearmand->ret;
00272 }
00273
00274 void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
00275 {
00276 uint8_t buffer= wakeup;
00277
00278
00279
00280 if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
00281 gearmand_log_error(gearmand, "gearmand_wakeup:write:%d", errno);
00282 }
00283
00284 void gearmand_set_round_robin(gearmand_st *gearmand, bool round_robin)
00285 {
00286 gearmand->server.flags.round_robin= round_robin;
00287 }
00288
00289
00290
00291
00292
00293
00294 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00295 {
00296 gearmand_st *gearmand= (gearmand_st *)context;
00297 (*gearmand->log_fn)(line, verbose, (void *)gearmand->log_context);
00298 }
00299
00300 static gearman_return_t _listen_init(gearmand_st *gearmand)
00301 {
00302 for (uint32_t x= 0; x < gearmand->port_count; x++)
00303 {
00304 int ret;
00305 struct gearmand_port_st *port;
00306 char port_str[NI_MAXSERV];
00307 struct addrinfo ai;
00308 struct addrinfo *addrinfo;
00309
00310 port= &gearmand->port_list[x];
00311
00312 snprintf(port_str, NI_MAXSERV, "%u", (uint32_t)(port->port));
00313
00314 memset(&ai, 0, sizeof(struct addrinfo));
00315 ai.ai_flags = AI_PASSIVE;
00316 ai.ai_family = AF_UNSPEC;
00317 ai.ai_socktype = SOCK_STREAM;
00318 ai.ai_protocol= IPPROTO_TCP;
00319
00320 ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
00321 if (ret != 0)
00322 {
00323 gearmand_log_fatal(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret));
00324 return GEARMAN_ERRNO;
00325 }
00326
00327 for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
00328 addrinfo_next= addrinfo_next->ai_next)
00329 {
00330 int opt;
00331 int fd;
00332 char host[NI_MAXHOST];
00333
00334 ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
00335 NI_MAXHOST, port_str, NI_MAXSERV,
00336 NI_NUMERICHOST | NI_NUMERICSERV);
00337 if (ret != 0)
00338 {
00339 gearmand_log_error(gearmand, "_listen_init:getnameinfo:%s", gai_strerror(ret));
00340 strcpy(host, "-");
00341 strcpy(port_str, "-");
00342 }
00343
00344 gearmand_log_debug(gearmand, "Trying to listen on %s:%s", host, port_str);
00345
00346
00347 fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
00348 addrinfo_next->ai_protocol);
00349 if (fd == -1)
00350 {
00351 gearmand_log_error(gearmand, "Failed to listen on %s:%s", host, port_str);
00352 continue;
00353 }
00354
00355 opt= 1;
00356 ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
00357 if (ret == -1)
00358 {
00359 close(fd);
00360 gearmand_log_fatal(gearmand, "_listen_init:setsockopt:%d", errno);
00361 return GEARMAN_ERRNO;
00362 }
00363
00364 ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
00365 if (ret == -1)
00366 {
00367 close(fd);
00368 if (errno == EADDRINUSE)
00369 {
00370 if (port->listen_fd == NULL)
00371 {
00372 gearmand_log_error(gearmand, "Address already in use %s:%s", host, port_str);
00373 }
00374
00375 continue;
00376 }
00377
00378 gearmand_log_fatal(gearmand, "_listen_init:bind:%d", errno);
00379 return GEARMAN_ERRNO;
00380 }
00381
00382 if (listen(fd, gearmand->backlog) == -1)
00383 {
00384 close(fd);
00385 gearmand_log_fatal(gearmand, "_listen_init:listen:%d", errno);
00386 return GEARMAN_ERRNO;
00387 }
00388
00389
00390 {
00391 int *fd_list;
00392
00393 fd_list= (int *)realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
00394 if (fd_list == NULL)
00395 {
00396 close(fd);
00397 gearmand_log_fatal(gearmand, "_listen_init:realloc:%d", errno);
00398 return GEARMAN_ERRNO;
00399 }
00400
00401 port->listen_fd= fd_list;
00402 }
00403
00404 port->listen_fd[port->listen_count]= fd;
00405 port->listen_count++;
00406
00407 gearmand_log_info(gearmand, "Listening on %s:%s (%d)", host, port_str, fd);
00408 }
00409
00410 freeaddrinfo(addrinfo);
00411
00412
00413 if (port->listen_fd == NULL)
00414 {
00415 gearmand_log_fatal(gearmand, "_listen_init:Could not bind/listen to any addresses");
00416 return GEARMAN_ERRNO;
00417 }
00418
00419 port->listen_event= (struct event *)malloc(sizeof(struct event) * port->listen_count);
00420 if (port->listen_event == NULL)
00421 {
00422 gearmand_log_fatal(gearmand, "_listen_init:malloc:%d", errno);
00423 return GEARMAN_ERRNO;
00424 }
00425
00426 for (uint32_t y= 0; y < port->listen_count; y++)
00427 {
00428 event_set(&(port->listen_event[y]), port->listen_fd[y],
00429 EV_READ | EV_PERSIST, _listen_event, port);
00430 event_base_set(gearmand->base, &(port->listen_event[y]));
00431 }
00432 }
00433
00434 return GEARMAN_SUCCESS;
00435 }
00436
00437 static void _listen_close(gearmand_st *gearmand)
00438 {
00439 _listen_clear(gearmand);
00440
00441 for (uint32_t x= 0; x < gearmand->port_count; x++)
00442 {
00443 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00444 {
00445 if (gearmand->port_list[x].listen_fd[y] >= 0)
00446 {
00447 gearmand_log_info(gearmand, "Closing listening socket (%d)", gearmand->port_list[x].listen_fd[y]);
00448 close(gearmand->port_list[x].listen_fd[y]);
00449 gearmand->port_list[x].listen_fd[y]= -1;
00450 }
00451 }
00452 }
00453 }
00454
00455 static gearman_return_t _listen_watch(gearmand_st *gearmand)
00456 {
00457 if (gearmand->is_listen_event)
00458 return GEARMAN_SUCCESS;
00459
00460 for (uint32_t x= 0; x < gearmand->port_count; x++)
00461 {
00462 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00463 {
00464 gearmand_log_info(gearmand, "Adding event for listening socket (%d)",
00465 gearmand->port_list[x].listen_fd[y]);
00466
00467 if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
00468 {
00469 gearmand_log_fatal(gearmand, "_listen_watch:event_add:-1");
00470 return GEARMAN_EVENT;
00471 }
00472 }
00473 }
00474
00475 gearmand->is_listen_event= true;
00476 return GEARMAN_SUCCESS;
00477 }
00478
00479 static void _listen_clear(gearmand_st *gearmand)
00480 {
00481 if (! (gearmand->is_listen_event))
00482 return;
00483
00484 int del_ret= 0;
00485 for (uint32_t x= 0; x < gearmand->port_count; x++)
00486 {
00487 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00488 {
00489 gearmand_log_info(gearmand, "Clearing event for listening socket (%d)",
00490 gearmand->port_list[x].listen_fd[y]);
00491 del_ret= event_del(&(gearmand->port_list[x].listen_event[y]));
00492 assert(del_ret == 0);
00493 }
00494 }
00495
00496 gearmand->is_listen_event= false;
00497 }
00498
00499 static void _listen_event(int fd, short events __attribute__ ((unused)),
00500 void *arg)
00501 {
00502 gearmand_port_st *port= (gearmand_port_st *)arg;
00503 struct sockaddr_storage sa;
00504 socklen_t sa_len;
00505 char host[NI_MAXHOST];
00506 char port_str[NI_MAXSERV];
00507 int ret;
00508
00509 sa_len= sizeof(sa);
00510 fd= accept(fd, (struct sockaddr *)&sa, &sa_len);
00511 if (fd == -1)
00512 {
00513 if (errno == EINTR)
00514 return;
00515 else if (errno == EMFILE)
00516 {
00517 gearmand_log_error(port->gearmand, "_listen_event:accept:too many open files");
00518 return;
00519 }
00520
00521 _clear_events(port->gearmand);
00522 gearmand_log_fatal(port->gearmand, "_listen_event:accept:%d", errno);
00523 port->gearmand->ret= GEARMAN_ERRNO;
00524 return;
00525 }
00526
00527
00528
00529 ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
00530 NI_NUMERICHOST | NI_NUMERICSERV);
00531 if (ret != 0)
00532 {
00533 gearmand_log_error(port->gearmand, "_listen_event:getnameinfo:%s", gai_strerror(ret));
00534 strcpy(host, "-");
00535 strcpy(port_str, "-");
00536 }
00537
00538 gearmand_log_info(port->gearmand, "Accepted connection from %s:%s", host, port_str);
00539
00540 port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
00541 port->add_fn);
00542 if (port->gearmand->ret != GEARMAN_SUCCESS)
00543 _clear_events(port->gearmand);
00544 }
00545
00546 static gearman_return_t _wakeup_init(gearmand_st *gearmand)
00547 {
00548 int ret;
00549
00550 gearmand_log_info(gearmand, "Creating wakeup pipe");
00551
00552 ret= pipe(gearmand->wakeup_fd);
00553 if (ret == -1)
00554 {
00555 gearmand_log_fatal(gearmand, "_wakeup_init:pipe:%d", errno);
00556 return GEARMAN_ERRNO;
00557 }
00558
00559 ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
00560 if (ret == -1)
00561 {
00562 gearmand_log_fatal(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
00563 return GEARMAN_ERRNO;
00564 }
00565
00566 ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00567 if (ret == -1)
00568 {
00569 gearmand_log_fatal(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
00570 return GEARMAN_ERRNO;
00571 }
00572
00573 event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
00574 EV_READ | EV_PERSIST, _wakeup_event, gearmand);
00575 event_base_set(gearmand->base, &(gearmand->wakeup_event));
00576
00577 return GEARMAN_SUCCESS;
00578 }
00579
00580 static void _wakeup_close(gearmand_st *gearmand)
00581 {
00582 _wakeup_clear(gearmand);
00583
00584 if (gearmand->wakeup_fd[0] >= 0)
00585 {
00586 gearmand_log_info(gearmand, "Closing wakeup pipe");
00587 close(gearmand->wakeup_fd[0]);
00588 gearmand->wakeup_fd[0]= -1;
00589 close(gearmand->wakeup_fd[1]);
00590 gearmand->wakeup_fd[1]= -1;
00591 }
00592 }
00593
00594 static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
00595 {
00596 if (gearmand->is_wakeup_event)
00597 return GEARMAN_SUCCESS;
00598
00599 gearmand_log_info(gearmand, "Adding event for wakeup pipe");
00600
00601 if (event_add(&(gearmand->wakeup_event), NULL) == -1)
00602 {
00603 gearmand_log_fatal(gearmand, "_wakeup_watch:event_add:-1");
00604 return GEARMAN_EVENT;
00605 }
00606
00607 gearmand->is_wakeup_event= true;
00608 return GEARMAN_SUCCESS;
00609 }
00610
00611 static void _wakeup_clear(gearmand_st *gearmand)
00612 {
00613 if (gearmand->is_wakeup_event)
00614 {
00615 gearmand_log_info(gearmand, "Clearing event for wakeup pipe");
00616 int del_ret= event_del(&(gearmand->wakeup_event));
00617 assert(del_ret == 0);
00618 gearmand->is_wakeup_event= false;
00619 }
00620 }
00621
00622 static void _wakeup_event(int fd, short events __attribute__ ((unused)),
00623 void *arg)
00624 {
00625 gearmand_st *gearmand= (gearmand_st *)arg;
00626 uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00627 ssize_t ret;
00628 gearmand_thread_st *thread;
00629
00630 while (1)
00631 {
00632 ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00633 if (ret == 0)
00634 {
00635 _clear_events(gearmand);
00636 gearmand_log_fatal(gearmand, "_wakeup_event:read:EOF");
00637 gearmand->ret= GEARMAN_PIPE_EOF;
00638 return;
00639 }
00640 else if (ret == -1)
00641 {
00642 if (errno == EINTR)
00643 continue;
00644
00645 if (errno == EAGAIN)
00646 break;
00647
00648 _clear_events(gearmand);
00649 gearmand_log_fatal(gearmand, "_wakeup_event:read:%d", errno);
00650 gearmand->ret= GEARMAN_ERRNO;
00651 return;
00652 }
00653
00654 for (ssize_t x= 0; x < ret; x++)
00655 {
00656 switch ((gearmand_wakeup_t)buffer[x])
00657 {
00658 case GEARMAND_WAKEUP_PAUSE:
00659 gearmand_log_info(gearmand, "Received PAUSE wakeup event");
00660 _clear_events(gearmand);
00661 gearmand->ret= GEARMAN_PAUSE;
00662 break;
00663
00664 case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00665 gearmand_log_info(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event");
00666 _listen_close(gearmand);
00667
00668 for (thread= gearmand->thread_list; thread != NULL;
00669 thread= thread->next)
00670 {
00671 gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
00672 }
00673
00674 gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
00675 break;
00676
00677 case GEARMAND_WAKEUP_SHUTDOWN:
00678 gearmand_log_info(gearmand, "Received SHUTDOWN wakeup event");
00679 _clear_events(gearmand);
00680 gearmand->ret= GEARMAN_SHUTDOWN;
00681 break;
00682
00683 case GEARMAND_WAKEUP_CON:
00684 case GEARMAND_WAKEUP_RUN:
00685 default:
00686 gearmand_log_fatal(gearmand, "Received unknown wakeup event (%u)", buffer[x]);
00687 _clear_events(gearmand);
00688 gearmand->ret= GEARMAN_UNKNOWN_STATE;
00689 break;
00690 }
00691 }
00692 }
00693 }
00694
00695 static gearman_return_t _watch_events(gearmand_st *gearmand)
00696 {
00697 gearman_return_t ret;
00698
00699 ret= _listen_watch(gearmand);
00700 if (ret != GEARMAN_SUCCESS)
00701 return ret;
00702
00703 ret= _wakeup_watch(gearmand);
00704 if (ret != GEARMAN_SUCCESS)
00705 return ret;
00706
00707 return GEARMAN_SUCCESS;
00708 }
00709
00710 static void _clear_events(gearmand_st *gearmand)
00711 {
00712 _listen_clear(gearmand);
00713 _wakeup_clear(gearmand);
00714
00715
00716
00717 if (gearmand->threads == 0 && gearmand->thread_list != NULL)
00718 gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
00719 }
00720
00721 static void _close_events(gearmand_st *gearmand)
00722 {
00723 _listen_close(gearmand);
00724 _wakeup_close(gearmand);
00725 }