00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00028
00029 static gearman_return_t _listen_init(gearmand_st *gearmand);
00030 static void _listen_close(gearmand_st *gearmand);
00031 static gearman_return_t _listen_watch(gearmand_st *gearmand);
00032 static void _listen_clear(gearmand_st *gearmand);
00033 static void _listen_event(int fd, short events, void *arg);
00034
00035 static gearman_return_t _wakeup_init(gearmand_st *gearmand);
00036 static void _wakeup_close(gearmand_st *gearmand);
00037 static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
00038 static void _wakeup_clear(gearmand_st *gearmand);
00039 static void _wakeup_event(int fd, short events, void *arg);
00040
00041 static gearman_return_t _watch_events(gearmand_st *gearmand);
00042 static void _clear_events(gearmand_st *gearmand);
00043 static void _close_events(gearmand_st *gearmand);
00044
00047
00048
00049
00050
00051 gearmand_st *gearmand_create(const char *host, in_port_t port)
00052 {
00053 gearmand_st *gearmand;
00054
00055 gearmand= malloc(sizeof(gearmand_st));
00056 if (gearmand == NULL)
00057 return NULL;
00058
00059 if (gearman_server_create(&(gearmand->server)) == NULL)
00060 {
00061 free(gearmand);
00062 return NULL;
00063 }
00064
00065 gearmand->options= 0;
00066 gearmand->verbose= 0;
00067 gearmand->ret= 0;
00068 gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
00069 gearmand->threads= 0;
00070 gearmand->port_count= 0;
00071 gearmand->thread_count= 0;
00072 gearmand->free_dcon_count= 0;
00073 gearmand->max_thread_free_dcon_count= 0;
00074 gearmand->wakeup_fd[0]= -1;
00075 gearmand->wakeup_fd[1]= -1;
00076 gearmand->host= host;
00077 gearmand->log_fn= NULL;
00078 gearmand->log_context= NULL;
00079 gearmand->base= NULL;
00080 gearmand->port_list= NULL;
00081 gearmand->thread_list= NULL;
00082 gearmand->thread_add_next= NULL;
00083 gearmand->free_dcon_list= NULL;
00084
00085 if (port == 0)
00086 port= GEARMAN_DEFAULT_TCP_PORT;
00087
00088 if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
00089 {
00090 gearmand_free(gearmand);
00091 return NULL;
00092 }
00093
00094 return gearmand;
00095 }
00096
00097 void gearmand_free(gearmand_st *gearmand)
00098 {
00099 gearmand_con_st *dcon;
00100 uint32_t x;
00101
00102 _close_events(gearmand);
00103
00104 if (gearmand->threads > 0)
00105 GEARMAN_INFO(gearmand, "Shutting down all threads")
00106
00107 while (gearmand->thread_list != NULL)
00108 gearmand_thread_free(gearmand->thread_list);
00109
00110 while (gearmand->free_dcon_list != NULL)
00111 {
00112 dcon= gearmand->free_dcon_list;
00113 gearmand->free_dcon_list= dcon->next;
00114 free(dcon);
00115 }
00116
00117 if (gearmand->base != NULL)
00118 event_base_free(gearmand->base);
00119
00120 gearman_server_free(&(gearmand->server));
00121
00122 for (x= 0; x < gearmand->port_count; x++)
00123 {
00124 if (gearmand->port_list[x].listen_fd != NULL)
00125 free(gearmand->port_list[x].listen_fd);
00126
00127 if (gearmand->port_list[x].listen_event != NULL)
00128 free(gearmand->port_list[x].listen_event);
00129 }
00130
00131 if (gearmand->port_list != NULL)
00132 free(gearmand->port_list);
00133
00134 GEARMAN_INFO(gearmand, "Shutdown complete")
00135
00136 free(gearmand);
00137 }
00138
00139 void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
00140 {
00141 gearmand->backlog= backlog;
00142 }
00143
00144 void gearmand_set_job_retries(gearmand_st *gearmand, uint8_t job_retries)
00145 {
00146 gearman_server_set_job_retries(&(gearmand->server), job_retries);
00147 }
00148
00149 void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
00150 {
00151 gearmand->threads= threads;
00152 }
00153
00154 void gearmand_set_log_fn(gearmand_st *gearmand, gearman_log_fn *function,
00155 const void *context, gearman_verbose_t verbose)
00156 {
00157 gearman_server_set_log_fn(&(gearmand->server), _log, gearmand, verbose);
00158 gearmand->log_fn= function;
00159 gearmand->log_context= context;
00160 gearmand->verbose= verbose;
00161 }
00162
00163 gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
00164 gearman_con_add_fn *function)
00165 {
00166 gearmand_port_st *port_list;
00167
00168 port_list= realloc(gearmand->port_list,
00169 sizeof(gearmand_port_st) * (gearmand->port_count + 1));
00170 if (port_list == NULL)
00171 {
00172 GEARMAN_FATAL(gearmand, "gearmand_port_add:realloc:NULL");
00173 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00174 }
00175
00176 port_list[gearmand->port_count].port= port;
00177 port_list[gearmand->port_count].listen_count= 0;
00178 port_list[gearmand->port_count].gearmand= gearmand;
00179 port_list[gearmand->port_count].add_fn= function;
00180 port_list[gearmand->port_count].listen_fd= NULL;
00181 port_list[gearmand->port_count].listen_event= NULL;
00182
00183 gearmand->port_list= port_list;
00184 gearmand->port_count++;
00185
00186 return GEARMAN_SUCCESS;
00187 }
00188
00189 gearman_return_t gearmand_run(gearmand_st *gearmand)
00190 {
00191 uint32_t x;
00192
00193
00194 if (gearmand->base == NULL)
00195 {
00196 GEARMAN_INFO(gearmand, "Starting up")
00197
00198 if (gearmand->threads > 0)
00199 {
00200 #ifndef HAVE_EVENT_BASE_NEW
00201 GEARMAN_FATAL(gearmand, "Multi-threaded gearmand requires libevent 1.4 " "or later, libevent 1.3 does not provided a "
00202 "thread-safe interface.");
00203 return GEARMAN_EVENT;
00204 #else
00205
00206
00207
00208 gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
00209 gearmand->threads) / 2);
00210 #endif
00211 }
00212
00213 GEARMAN_DEBUG(gearmand, "Initializing libevent for main thread")
00214
00215 gearmand->base= event_base_new();
00216 if (gearmand->base == NULL)
00217 {
00218 GEARMAN_FATAL(gearmand, "gearmand_run:event_base_new:NULL")
00219 return GEARMAN_EVENT;
00220 }
00221
00222 GEARMAN_DEBUG(gearmand, "Method for libevent: %s",
00223 event_base_get_method(gearmand->base));
00224
00225 gearmand->ret= _listen_init(gearmand);
00226 if (gearmand->ret != GEARMAN_SUCCESS)
00227 return gearmand->ret;
00228
00229 gearmand->ret= _wakeup_init(gearmand);
00230 if (gearmand->ret != GEARMAN_SUCCESS)
00231 return gearmand->ret;
00232
00233 GEARMAN_DEBUG(gearmand, "Creating %u threads", gearmand->threads)
00234
00235
00236 x= 0;
00237 do
00238 {
00239 gearmand->ret= gearmand_thread_create(gearmand);
00240 if (gearmand->ret != GEARMAN_SUCCESS)
00241 return gearmand->ret;
00242 x++;
00243 }
00244 while (x < gearmand->threads);
00245
00246 gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
00247 if (gearmand->ret != GEARMAN_SUCCESS)
00248 return gearmand->ret;
00249 }
00250
00251 gearmand->ret= _watch_events(gearmand);
00252 if (gearmand->ret != GEARMAN_SUCCESS)
00253 return gearmand->ret;
00254
00255 GEARMAN_INFO(gearmand, "Entering main event loop")
00256
00257 if (event_base_loop(gearmand->base, 0) == -1)
00258 {
00259 GEARMAN_FATAL(gearmand, "gearmand_run:event_base_loop:-1")
00260 return GEARMAN_EVENT;
00261 }
00262
00263 GEARMAN_INFO(gearmand, "Exited main event loop")
00264
00265 return gearmand->ret;
00266 }
00267
00268 void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
00269 {
00270 uint8_t buffer= wakeup;
00271
00272
00273
00274 if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
00275 GEARMAN_ERROR(gearmand, "gearmand_wakeup:write:%d", errno)
00276 }
00277
00278
00279
00280
00281
00282 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00283 {
00284 gearmand_st *gearmand= (gearmand_st *)context;
00285 (*gearmand->log_fn)(line, verbose, (void *)gearmand->log_context);
00286 }
00287
00288 static gearman_return_t _listen_init(gearmand_st *gearmand)
00289 {
00290 struct gearmand_port_st *port;
00291 struct addrinfo *addrinfo;
00292 struct addrinfo *addrinfo_next;
00293 struct addrinfo ai;
00294 int ret;
00295 int opt;
00296 char host[NI_MAXHOST];
00297 char port_str[NI_MAXSERV];
00298 int fd;
00299 int *fd_list;
00300 uint32_t x;
00301 uint32_t y;
00302
00303 for (x= 0; x < gearmand->port_count; x++)
00304 {
00305 port= &gearmand->port_list[x];
00306
00307 snprintf(port_str, NI_MAXSERV, "%u", port->port);
00308
00309 memset(&ai, 0, sizeof(struct addrinfo));
00310 ai.ai_flags = AI_PASSIVE;
00311 ai.ai_family = AF_UNSPEC;
00312 ai.ai_socktype = SOCK_STREAM;
00313 ai.ai_protocol= IPPROTO_TCP;
00314
00315 ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
00316 if (ret != 0)
00317 {
00318 GEARMAN_FATAL(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret))
00319 return GEARMAN_ERRNO;
00320 }
00321
00322 for (addrinfo_next= addrinfo; addrinfo_next != NULL;
00323 addrinfo_next= addrinfo_next->ai_next)
00324 {
00325 ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
00326 NI_MAXHOST, port_str, NI_MAXSERV,
00327 NI_NUMERICHOST | NI_NUMERICSERV);
00328 if (ret != 0)
00329 {
00330 GEARMAN_ERROR(gearmand, "_listen_init:getnameinfo:%s",
00331 gai_strerror(ret))
00332 strcpy(host, "-");
00333 strcpy(port_str, "-");
00334 }
00335
00336 GEARMAN_DEBUG(gearmand, "Trying to listen on %s:%s", host, port_str)
00337
00338
00339 fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
00340 addrinfo_next->ai_protocol);
00341 if (fd == -1)
00342 {
00343 GEARMAN_ERROR(gearmand, "Failed to listen on %s:%s", host, port_str)
00344 continue;
00345 }
00346
00347 opt= 1;
00348 ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
00349 if (ret == -1)
00350 {
00351 close(fd);
00352 GEARMAN_FATAL(gearmand, "_listen_init:setsockopt:%d", errno)
00353 return GEARMAN_ERRNO;
00354 }
00355
00356 ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
00357 if (ret == -1)
00358 {
00359 close(fd);
00360 if (errno == EADDRINUSE)
00361 {
00362 if (port->listen_fd == NULL)
00363 {
00364 GEARMAN_ERROR(gearmand, "Address already in use %s:%s", host,
00365 port_str)
00366 }
00367
00368 continue;
00369 }
00370
00371 GEARMAN_FATAL(gearmand, "_listen_init:bind:%d", errno)
00372 return GEARMAN_ERRNO;
00373 }
00374
00375 if (listen(fd, gearmand->backlog) == -1)
00376 {
00377 close(fd);
00378 GEARMAN_FATAL(gearmand, "_listen_init:listen:%d", errno)
00379 return GEARMAN_ERRNO;
00380 }
00381
00382 fd_list= realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
00383 if (fd_list == NULL)
00384 {
00385 close(fd);
00386 GEARMAN_FATAL(gearmand, "_listen_init:realloc:%d", errno)
00387 return GEARMAN_ERRNO;
00388 }
00389
00390 port->listen_fd= fd_list;
00391 port->listen_fd[port->listen_count]= fd;
00392 port->listen_count++;
00393
00394 GEARMAN_INFO(gearmand, "Listening on %s:%s (%d)", host, port_str, fd)
00395 }
00396
00397 freeaddrinfo(addrinfo);
00398
00399
00400 if (port->listen_fd == NULL)
00401 {
00402 GEARMAN_FATAL(gearmand,
00403 "_listen_init:Could not bind/listen to any addresses")
00404 return GEARMAN_ERRNO;
00405 }
00406
00407 port->listen_event= malloc(sizeof(struct event) * port->listen_count);
00408 if (port->listen_event == NULL)
00409 {
00410 GEARMAN_FATAL(gearmand, "_listen_init:malloc:%d", errno)
00411 return GEARMAN_ERRNO;
00412 }
00413
00414 for (y= 0; y < port->listen_count; y++)
00415 {
00416 event_set(&(port->listen_event[y]), port->listen_fd[y],
00417 EV_READ | EV_PERSIST, _listen_event, port);
00418 event_base_set(gearmand->base, &(port->listen_event[y]));
00419 }
00420 }
00421
00422 return GEARMAN_SUCCESS;
00423 }
00424
00425 static void _listen_close(gearmand_st *gearmand)
00426 {
00427 uint32_t x;
00428 uint32_t y;
00429
00430 _listen_clear(gearmand);
00431
00432 for (x= 0; x < gearmand->port_count; x++)
00433 {
00434 for (y= 0; y < gearmand->port_list[x].listen_count; y++)
00435 {
00436 if (gearmand->port_list[x].listen_fd[y] >= 0)
00437 {
00438 GEARMAN_INFO(gearmand, "Closing listening socket (%d)",
00439 gearmand->port_list[x].listen_fd[y])
00440 close(gearmand->port_list[x].listen_fd[y]);
00441 gearmand->port_list[x].listen_fd[y]= -1;
00442 }
00443 }
00444 }
00445 }
00446
00447 static gearman_return_t _listen_watch(gearmand_st *gearmand)
00448 {
00449 uint32_t x;
00450 uint32_t y;
00451
00452 if (gearmand->options & GEARMAND_LISTEN_EVENT)
00453 return GEARMAN_SUCCESS;
00454
00455 for (x= 0; x < gearmand->port_count; x++)
00456 {
00457 for (y= 0; y < gearmand->port_list[x].listen_count; y++)
00458 {
00459 GEARMAN_INFO(gearmand, "Adding event for listening socket (%d)",
00460 gearmand->port_list[x].listen_fd[y])
00461
00462 if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
00463 {
00464 GEARMAN_FATAL(gearmand, "_listen_watch:event_add:-1")
00465 return GEARMAN_EVENT;
00466 }
00467 }
00468 }
00469
00470 gearmand->options|= GEARMAND_LISTEN_EVENT;
00471 return GEARMAN_SUCCESS;
00472 }
00473
00474 static void _listen_clear(gearmand_st *gearmand)
00475 {
00476 uint32_t x;
00477 uint32_t y;
00478
00479 if (!(gearmand->options & GEARMAND_LISTEN_EVENT))
00480 return;
00481
00482 for (x= 0; x < gearmand->port_count; x++)
00483 {
00484 for (y= 0; y < gearmand->port_list[x].listen_count; y++)
00485 {
00486 GEARMAN_INFO(gearmand, "Clearing event for listening socket (%d)",
00487 gearmand->port_list[x].listen_fd[y])
00488 assert(event_del(&(gearmand->port_list[x].listen_event[y])) == 0);
00489 }
00490 }
00491
00492 gearmand->options&= (gearmand_options_t)~GEARMAND_LISTEN_EVENT;
00493 }
00494
00495 static void _listen_event(int fd, short events __attribute__ ((unused)),
00496 void *arg)
00497 {
00498 gearmand_port_st *port= (gearmand_port_st *)arg;
00499 struct sockaddr sa;
00500 socklen_t sa_len;
00501 char host[NI_MAXHOST];
00502 char port_str[NI_MAXSERV];
00503 int ret;
00504
00505 sa_len= sizeof(sa);
00506 fd= accept(fd, &sa, &sa_len);
00507 if (fd == -1)
00508 {
00509 if (errno == EINTR)
00510 return;
00511 else if (errno == EMFILE)
00512 {
00513 GEARMAN_ERROR(port->gearmand, "_listen_event:accept:too many open files")
00514 return;
00515 }
00516
00517 _clear_events(port->gearmand);
00518 GEARMAN_FATAL(port->gearmand, "_listen_event:accept:%d", errno)
00519 port->gearmand->ret= GEARMAN_ERRNO;
00520 return;
00521 }
00522
00523
00524
00525 ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
00526 NI_NUMERICHOST | NI_NUMERICSERV);
00527 if (ret != 0)
00528 {
00529 GEARMAN_ERROR(port->gearmand, "_listen_event:getnameinfo:%s",
00530 gai_strerror(ret))
00531 strcpy(host, "-");
00532 strcpy(port_str, "-");
00533 }
00534
00535 GEARMAN_INFO(port->gearmand, "Accepted connection from %s:%s", host, port_str)
00536
00537 port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
00538 port->add_fn);
00539 if (port->gearmand->ret != GEARMAN_SUCCESS)
00540 _clear_events(port->gearmand);
00541 }
00542
00543 static gearman_return_t _wakeup_init(gearmand_st *gearmand)
00544 {
00545 int ret;
00546
00547 GEARMAN_INFO(gearmand, "Creating wakeup pipe")
00548
00549 ret= pipe(gearmand->wakeup_fd);
00550 if (ret == -1)
00551 {
00552 GEARMAN_FATAL(gearmand, "_wakeup_init:pipe:%d", errno)
00553 return GEARMAN_ERRNO;
00554 }
00555
00556 ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
00557 if (ret == -1)
00558 {
00559 GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno)
00560 return GEARMAN_ERRNO;
00561 }
00562
00563 ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00564 if (ret == -1)
00565 {
00566 GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno)
00567 return GEARMAN_ERRNO;
00568 }
00569
00570 event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
00571 EV_READ | EV_PERSIST, _wakeup_event, gearmand);
00572 event_base_set(gearmand->base, &(gearmand->wakeup_event));
00573
00574 return GEARMAN_SUCCESS;
00575 }
00576
00577 static void _wakeup_close(gearmand_st *gearmand)
00578 {
00579 _wakeup_clear(gearmand);
00580
00581 if (gearmand->wakeup_fd[0] >= 0)
00582 {
00583 GEARMAN_INFO(gearmand, "Closing wakeup pipe")
00584 close(gearmand->wakeup_fd[0]);
00585 gearmand->wakeup_fd[0]= -1;
00586 close(gearmand->wakeup_fd[1]);
00587 gearmand->wakeup_fd[1]= -1;
00588 }
00589 }
00590
00591 static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
00592 {
00593 if (gearmand->options & GEARMAND_WAKEUP_EVENT)
00594 return GEARMAN_SUCCESS;
00595
00596 GEARMAN_INFO(gearmand, "Adding event for wakeup pipe")
00597
00598 if (event_add(&(gearmand->wakeup_event), NULL) == -1)
00599 {
00600 GEARMAN_FATAL(gearmand, "_wakeup_watch:event_add:-1")
00601 return GEARMAN_EVENT;
00602 }
00603
00604 gearmand->options|= GEARMAND_WAKEUP_EVENT;
00605 return GEARMAN_SUCCESS;
00606 }
00607
00608 static void _wakeup_clear(gearmand_st *gearmand)
00609 {
00610 if (gearmand->options & GEARMAND_WAKEUP_EVENT)
00611 {
00612 GEARMAN_INFO(gearmand, "Clearing event for wakeup pipe")
00613 assert(event_del(&(gearmand->wakeup_event)) == 0);
00614 gearmand->options&= (gearmand_options_t)~GEARMAND_WAKEUP_EVENT;
00615 }
00616 }
00617
00618 static void _wakeup_event(int fd, short events __attribute__ ((unused)),
00619 void *arg)
00620 {
00621 gearmand_st *gearmand= (gearmand_st *)arg;
00622 uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00623 ssize_t ret;
00624 ssize_t x;
00625 gearmand_thread_st *thread;
00626
00627 while (1)
00628 {
00629 ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00630 if (ret == 0)
00631 {
00632 _clear_events(gearmand);
00633 GEARMAN_FATAL(gearmand, "_wakeup_event:read:EOF")
00634 gearmand->ret= GEARMAN_PIPE_EOF;
00635 return;
00636 }
00637 else if (ret == -1)
00638 {
00639 if (errno == EINTR)
00640 continue;
00641
00642 if (errno == EAGAIN)
00643 break;
00644
00645 _clear_events(gearmand);
00646 GEARMAN_FATAL(gearmand, "_wakeup_event:read:%d", errno)
00647 gearmand->ret= GEARMAN_ERRNO;
00648 return;
00649 }
00650
00651 for (x= 0; x < ret; x++)
00652 {
00653 switch ((gearmand_wakeup_t)buffer[x])
00654 {
00655 case GEARMAND_WAKEUP_PAUSE:
00656 GEARMAN_INFO(gearmand, "Received PAUSE wakeup event")
00657 _clear_events(gearmand);
00658 gearmand->ret= GEARMAN_PAUSE;
00659 break;
00660
00661 case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00662 GEARMAN_INFO(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event")
00663 _listen_close(gearmand);
00664
00665 for (thread= gearmand->thread_list; thread != NULL;
00666 thread= thread->next)
00667 {
00668 gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
00669 }
00670
00671 gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
00672 break;
00673
00674 case GEARMAND_WAKEUP_SHUTDOWN:
00675 GEARMAN_INFO(gearmand, "Received SHUTDOWN wakeup event")
00676 _clear_events(gearmand);
00677 gearmand->ret= GEARMAN_SHUTDOWN;
00678 break;
00679
00680 case GEARMAND_WAKEUP_CON:
00681 case GEARMAND_WAKEUP_RUN:
00682 default:
00683 GEARMAN_FATAL(gearmand, "Received unknown wakeup event (%u)",
00684 buffer[x])
00685 _clear_events(gearmand);
00686 gearmand->ret= GEARMAN_UNKNOWN_STATE;
00687 break;
00688 }
00689 }
00690 }
00691 }
00692
00693 static gearman_return_t _watch_events(gearmand_st *gearmand)
00694 {
00695 gearman_return_t ret;
00696
00697 ret= _listen_watch(gearmand);
00698 if (ret != GEARMAN_SUCCESS)
00699 return ret;
00700
00701 ret= _wakeup_watch(gearmand);
00702 if (ret != GEARMAN_SUCCESS)
00703 return ret;
00704
00705 return GEARMAN_SUCCESS;
00706 }
00707
00708 static void _clear_events(gearmand_st *gearmand)
00709 {
00710 _listen_clear(gearmand);
00711 _wakeup_clear(gearmand);
00712
00713
00714
00715 if (gearmand->threads == 0 && gearmand->thread_list != NULL)
00716 gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
00717 }
00718
00719 static void _close_events(gearmand_st *gearmand)
00720 {
00721 _listen_close(gearmand);
00722 _wakeup_close(gearmand);
00723 }