00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static const char *_verbose_name[GEARMAN_VERBOSE_MAX]=
00030 {
00031 "FATAL",
00032 "ERROR",
00033 "INFO",
00034 "DEBUG",
00035 "CRAZY"
00036 };
00037
00040
00041
00042
00043
00044 const char *gearman_version(void)
00045 {
00046 return PACKAGE_VERSION;
00047 }
00048
00049 const char *gearman_bugreport(void)
00050 {
00051 return PACKAGE_BUGREPORT;
00052 }
00053
00054 const char *gearman_verbose_name(gearman_verbose_t verbose)
00055 {
00056 if (verbose >= GEARMAN_VERBOSE_MAX)
00057 return "UNKNOWN";
00058
00059 return _verbose_name[verbose];
00060 }
00061
00062 gearman_st *gearman_create(gearman_st *gearman)
00063 {
00064 if (gearman == NULL)
00065 {
00066 gearman= malloc(sizeof(gearman_st));
00067 if (gearman == NULL)
00068 return NULL;
00069
00070 gearman->options= GEARMAN_ALLOCATED;
00071 }
00072 else
00073 gearman->options= 0;
00074
00075 gearman->verbose= 0;
00076 gearman->con_count= 0;
00077 gearman->packet_count= 0;
00078 gearman->pfds_size= 0;
00079 gearman->sending= 0;
00080 gearman->last_errno= 0;
00081 gearman->timeout= -1;
00082 gearman->con_list= NULL;
00083 gearman->packet_list= NULL;
00084 gearman->pfds= NULL;
00085 gearman->log_fn= NULL;
00086 gearman->log_context= NULL;
00087 gearman->event_watch_fn= NULL;
00088 gearman->event_watch_context= NULL;
00089 gearman->workload_malloc_fn= NULL;
00090 gearman->workload_malloc_context= NULL;
00091 gearman->workload_free_fn= NULL;
00092 gearman->workload_free_context= NULL;
00093 gearman->last_error[0]= 0;
00094
00095 return gearman;
00096 }
00097
00098 gearman_st *gearman_clone(gearman_st *gearman, const gearman_st *from)
00099 {
00100 gearman_con_st *con;
00101
00102 gearman= gearman_create(gearman);
00103 if (gearman == NULL)
00104 return NULL;
00105
00106 gearman->options|= (from->options & (gearman_options_t)~GEARMAN_ALLOCATED);
00107 gearman->timeout= from->timeout;
00108
00109 for (con= from->con_list; con != NULL; con= con->next)
00110 {
00111 if (gearman_con_clone(gearman, NULL, con) == NULL)
00112 {
00113 gearman_free(gearman);
00114 return NULL;
00115 }
00116 }
00117
00118
00119
00120
00121 return gearman;
00122 }
00123
00124 void gearman_free(gearman_st *gearman)
00125 {
00126 gearman_con_free_all(gearman);
00127 gearman_packet_free_all(gearman);
00128
00129 if (gearman->pfds != NULL)
00130 free(gearman->pfds);
00131
00132 if (gearman->options & GEARMAN_ALLOCATED)
00133 free(gearman);
00134 }
00135
00136 const char *gearman_error(const gearman_st *gearman)
00137 {
00138 return (const char *)(gearman->last_error);
00139 }
00140
00141 int gearman_errno(const gearman_st *gearman)
00142 {
00143 return gearman->last_errno;
00144 }
00145
00146 gearman_options_t gearman_options(const gearman_st *gearman)
00147 {
00148 return gearman->options;
00149 }
00150
00151 void gearman_set_options(gearman_st *gearman, gearman_options_t options)
00152 {
00153 gearman->options= options;
00154 }
00155
00156 void gearman_add_options(gearman_st *gearman, gearman_options_t options)
00157 {
00158 gearman->options|= options;
00159 }
00160
00161 void gearman_remove_options(gearman_st *gearman, gearman_options_t options)
00162 {
00163 gearman->options&= ~options;
00164 }
00165
00166 int gearman_timeout(gearman_st *gearman)
00167 {
00168 return gearman->timeout;
00169 }
00170
00171 void gearman_set_timeout(gearman_st *gearman, int timeout)
00172 {
00173 gearman->timeout= timeout;
00174 }
00175
00176 void gearman_set_log_fn(gearman_st *gearman, gearman_log_fn *function,
00177 const void *context, gearman_verbose_t verbose)
00178 {
00179 gearman->log_fn= function;
00180 gearman->log_context= context;
00181 gearman->verbose= verbose;
00182 }
00183
00184 void gearman_set_event_watch_fn(gearman_st *gearman,
00185 gearman_event_watch_fn *function,
00186 const void *context)
00187 {
00188 gearman->event_watch_fn= function;
00189 gearman->event_watch_context= context;
00190 }
00191
00192 void gearman_set_workload_malloc_fn(gearman_st *gearman,
00193 gearman_malloc_fn *function,
00194 const void *context)
00195 {
00196 gearman->workload_malloc_fn= function;
00197 gearman->workload_malloc_context= context;
00198 }
00199
00200 void gearman_set_workload_free_fn(gearman_st *gearman,
00201 gearman_free_fn *function,
00202 const void *context)
00203 {
00204 gearman->workload_free_fn= function;
00205 gearman->workload_free_context= context;
00206 }
00207
00208
00209
00210
00211
00212 gearman_con_st *gearman_con_create(gearman_st *gearman, gearman_con_st *con)
00213 {
00214 if (con == NULL)
00215 {
00216 con= malloc(sizeof(gearman_con_st));
00217 if (con == NULL)
00218 {
00219 gearman_error_set(gearman, "gearman_con_create", "malloc");
00220 return NULL;
00221 }
00222
00223 con->options= GEARMAN_CON_ALLOCATED;
00224 }
00225 else
00226 con->options= 0;
00227
00228 con->state= 0;
00229 con->send_state= 0;
00230 con->recv_state= 0;
00231 con->port= 0;
00232 con->events= 0;
00233 con->revents= 0;
00234 con->fd= -1;
00235 con->created_id= 0;
00236 con->created_id_next= 0;
00237 con->send_buffer_size= 0;
00238 con->send_data_size= 0;
00239 con->send_data_offset= 0;
00240 con->recv_buffer_size= 0;
00241 con->recv_data_size= 0;
00242 con->recv_data_offset= 0;
00243 con->gearman= gearman;
00244
00245 if (gearman->con_list != NULL)
00246 gearman->con_list->prev= con;
00247 con->next= gearman->con_list;
00248 con->prev= NULL;
00249 gearman->con_list= con;
00250 gearman->con_count++;
00251
00252 con->context= NULL;
00253 con->addrinfo= NULL;
00254 con->addrinfo_next= NULL;
00255 con->send_buffer_ptr= con->send_buffer;
00256 con->recv_packet= NULL;
00257 con->recv_buffer_ptr= con->recv_buffer;
00258 con->protocol_context= NULL;
00259 con->protocol_context_free_fn= NULL;
00260 con->packet_pack_fn= gearman_packet_pack;
00261 con->packet_unpack_fn= gearman_packet_unpack;
00262 con->host[0]= 0;
00263
00264 return con;
00265 }
00266
00267 gearman_con_st *gearman_con_add(gearman_st *gearman, gearman_con_st *con,
00268 const char *host, in_port_t port)
00269 {
00270 con= gearman_con_create(gearman, con);
00271 if (con == NULL)
00272 return NULL;
00273
00274 gearman_con_set_host(con, host);
00275 gearman_con_set_port(con, port);
00276
00277 return con;
00278 }
00279
00280 gearman_con_st *gearman_con_clone(gearman_st *gearman, gearman_con_st *con,
00281 const gearman_con_st *from)
00282 {
00283 con= gearman_con_create(gearman, con);
00284 if (con == NULL)
00285 return NULL;
00286
00287 con->options|= (from->options &
00288 (gearman_con_options_t)~GEARMAN_CON_ALLOCATED);
00289 strcpy(con->host, from->host);
00290 con->port= from->port;
00291
00292 return con;
00293 }
00294
00295 void gearman_con_free(gearman_con_st *con)
00296 {
00297 if (con->fd != -1)
00298 gearman_con_close(con);
00299
00300 gearman_con_reset_addrinfo(con);
00301
00302 if (con->protocol_context != NULL && con->protocol_context_free_fn != NULL)
00303 (*con->protocol_context_free_fn)(con, (void *)con->protocol_context);
00304
00305 if (con->gearman->con_list == con)
00306 con->gearman->con_list= con->next;
00307 if (con->prev != NULL)
00308 con->prev->next= con->next;
00309 if (con->next != NULL)
00310 con->next->prev= con->prev;
00311 con->gearman->con_count--;
00312
00313 if (con->options & GEARMAN_CON_PACKET_IN_USE)
00314 gearman_packet_free(&(con->packet));
00315
00316 if (con->options & GEARMAN_CON_ALLOCATED)
00317 free(con);
00318 }
00319
00320 void gearman_con_free_all(gearman_st *gearman)
00321 {
00322 while (gearman->con_list != NULL)
00323 gearman_con_free(gearman->con_list);
00324 }
00325
00326 gearman_return_t gearman_con_flush_all(gearman_st *gearman)
00327 {
00328 gearman_con_st *con;
00329 gearman_return_t ret;
00330
00331 for (con= gearman->con_list; con != NULL; con= con->next)
00332 {
00333 if (con->events & POLLOUT)
00334 continue;
00335
00336 ret= gearman_con_flush(con);
00337 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00338 return ret;
00339 }
00340
00341 return GEARMAN_SUCCESS;
00342 }
00343
00344 gearman_return_t gearman_con_send_all(gearman_st *gearman,
00345 const gearman_packet_st *packet)
00346 {
00347 gearman_return_t ret;
00348 gearman_con_st *con;
00349 gearman_options_t options= gearman->options;
00350
00351 gearman->options|= GEARMAN_NON_BLOCKING;
00352
00353 if (gearman->sending == 0)
00354 {
00355 for (con= gearman->con_list; con != NULL; con= con->next)
00356 {
00357 ret= gearman_con_send(con, packet, true);
00358 if (ret != GEARMAN_SUCCESS)
00359 {
00360 if (ret != GEARMAN_IO_WAIT)
00361 {
00362 gearman->options= options;
00363 return ret;
00364 }
00365
00366 gearman->sending++;
00367 break;
00368 }
00369 }
00370 }
00371
00372 while (gearman->sending != 0)
00373 {
00374 while ((con= gearman_con_ready(gearman)) != NULL)
00375 {
00376 ret= gearman_con_send(con, packet, true);
00377 if (ret != GEARMAN_SUCCESS)
00378 {
00379 if (ret != GEARMAN_IO_WAIT)
00380 {
00381 gearman->options= options;
00382 return ret;
00383 }
00384
00385 continue;
00386 }
00387
00388 gearman->sending--;
00389 }
00390
00391 if (gearman->sending == 0)
00392 break;
00393
00394 if (options & GEARMAN_NON_BLOCKING)
00395 {
00396 gearman->options= options;
00397 return GEARMAN_IO_WAIT;
00398 }
00399
00400 ret= gearman_con_wait(gearman);
00401 if (ret != GEARMAN_SUCCESS)
00402 {
00403 gearman->options= options;
00404 return ret;
00405 }
00406 }
00407
00408 gearman->options= options;
00409 return GEARMAN_SUCCESS;
00410 }
00411
00412 gearman_return_t gearman_con_wait(gearman_st *gearman)
00413 {
00414 gearman_con_st *con;
00415 struct pollfd *pfds;
00416 nfds_t x;
00417 int ret;
00418 gearman_return_t gret;
00419
00420 if (gearman->pfds_size < gearman->con_count)
00421 {
00422 pfds= realloc(gearman->pfds, gearman->con_count * sizeof(struct pollfd));
00423 if (pfds == NULL)
00424 {
00425 gearman_error_set(gearman, "gearman_con_wait", "realloc");
00426 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00427 }
00428
00429 gearman->pfds= pfds;
00430 gearman->pfds_size= gearman->con_count;
00431 }
00432 else
00433 pfds= gearman->pfds;
00434
00435 x= 0;
00436 for (con= gearman->con_list; con != NULL; con= con->next)
00437 {
00438 if (con->events == 0)
00439 continue;
00440
00441 pfds[x].fd= con->fd;
00442 pfds[x].events= con->events;
00443 pfds[x].revents= 0;
00444 x++;
00445 }
00446
00447 if (x == 0)
00448 {
00449 gearman_error_set(gearman, "gearman_con_wait",
00450 "no active file descriptors");
00451 return GEARMAN_NO_ACTIVE_FDS;
00452 }
00453
00454 while (1)
00455 {
00456 ret= poll(pfds, x, gearman->timeout);
00457 if (ret == -1)
00458 {
00459 if (errno == EINTR)
00460 continue;
00461
00462 gearman_error_set(gearman, "gearman_con_wait", "poll:%d", errno);
00463 gearman->last_errno= errno;
00464 return GEARMAN_ERRNO;
00465 }
00466
00467 break;
00468 }
00469
00470 if (ret == 0)
00471 {
00472 gearman_error_set(gearman, "gearman_con_wait", "timeout reached");
00473 return GEARMAN_TIMEOUT;
00474 }
00475
00476 x= 0;
00477 for (con= gearman->con_list; con != NULL; con= con->next)
00478 {
00479 if (con->events == 0)
00480 continue;
00481
00482 gret= gearman_con_set_revents(con, pfds[x].revents);
00483 if (gret != GEARMAN_SUCCESS)
00484 return gret;
00485
00486 x++;
00487 }
00488
00489 return GEARMAN_SUCCESS;
00490 }
00491
00492 gearman_con_st *gearman_con_ready(gearman_st *gearman)
00493 {
00494 gearman_con_st *con;
00495
00496
00497
00498
00499 for (con= gearman->con_list; con != NULL; con= con->next)
00500 {
00501 if (con->options & GEARMAN_CON_READY)
00502 {
00503 con->options&= (gearman_con_options_t)~GEARMAN_CON_READY;
00504 return con;
00505 }
00506 }
00507
00508 return NULL;
00509 }
00510
00511 gearman_return_t gearman_con_echo(gearman_st *gearman, const void *workload,
00512 size_t workload_size)
00513 {
00514 gearman_con_st *con;
00515 gearman_options_t options= gearman->options;
00516 gearman_packet_st packet;
00517 gearman_return_t ret;
00518
00519 ret= gearman_packet_add(gearman, &packet, GEARMAN_MAGIC_REQUEST,
00520 GEARMAN_COMMAND_ECHO_REQ, workload, workload_size,
00521 NULL);
00522 if (ret != GEARMAN_SUCCESS)
00523 return ret;
00524
00525 gearman->options&= (gearman_con_options_t)~GEARMAN_NON_BLOCKING;
00526
00527 for (con= gearman->con_list; con != NULL; con= con->next)
00528 {
00529 ret= gearman_con_send(con, &packet, true);
00530 if (ret != GEARMAN_SUCCESS)
00531 {
00532 gearman_packet_free(&packet);
00533 gearman->options= options;
00534 return ret;
00535 }
00536
00537 (void)gearman_con_recv(con, &(con->packet), &ret, true);
00538 if (ret != GEARMAN_SUCCESS)
00539 {
00540 gearman_packet_free(&packet);
00541 gearman->options= options;
00542 return ret;
00543 }
00544
00545 if (con->packet.data_size != workload_size ||
00546 memcmp(workload, con->packet.data, workload_size))
00547 {
00548 gearman_packet_free(&(con->packet));
00549 gearman_packet_free(&packet);
00550 gearman->options= options;
00551 gearman_error_set(gearman, "gearman_con_echo", "corruption during echo");
00552 return GEARMAN_ECHO_DATA_CORRUPTION;
00553 }
00554
00555 gearman_packet_free(&(con->packet));
00556 }
00557
00558 gearman_packet_free(&packet);
00559 gearman->options= options;
00560 return GEARMAN_SUCCESS;
00561 }
00562
00563
00564
00565
00566
00567 gearman_packet_st *gearman_packet_create(gearman_st *gearman,
00568 gearman_packet_st *packet)
00569 {
00570 if (packet == NULL)
00571 {
00572 packet= malloc(sizeof(gearman_packet_st));
00573 if (packet == NULL)
00574 {
00575 gearman_error_set(gearman, "gearman_packet_create", "malloc");
00576 return NULL;
00577 }
00578
00579 packet->options= GEARMAN_PACKET_ALLOCATED;
00580 }
00581 else
00582 packet->options= 0;
00583
00584 packet->magic= 0;
00585 packet->command= 0;
00586 packet->argc= 0;
00587 packet->args_size= 0;
00588 packet->data_size= 0;
00589 packet->gearman= gearman;
00590
00591 if (!(gearman->options & GEARMAN_DONT_TRACK_PACKETS))
00592 {
00593 if (gearman->packet_list != NULL)
00594 gearman->packet_list->prev= packet;
00595 packet->next= gearman->packet_list;
00596 packet->prev= NULL;
00597 gearman->packet_list= packet;
00598 gearman->packet_count++;
00599 }
00600
00601 packet->args= NULL;
00602 packet->data= NULL;
00603
00604 return packet;
00605 }
00606
00607 gearman_return_t gearman_packet_add(gearman_st *gearman,
00608 gearman_packet_st *packet,
00609 gearman_magic_t magic,
00610 gearman_command_t command,
00611 const void *arg, ...)
00612 {
00613 va_list ap;
00614 size_t arg_size;
00615 gearman_return_t ret;
00616
00617 packet= gearman_packet_create(gearman, packet);
00618 if (packet == NULL)
00619 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00620
00621 packet->magic= magic;
00622 packet->command= command;
00623
00624 va_start(ap, arg);
00625
00626 while (arg != NULL)
00627 {
00628 arg_size = va_arg(ap, size_t);
00629
00630 ret= gearman_packet_add_arg(packet, arg, arg_size);
00631 if (ret != GEARMAN_SUCCESS)
00632 {
00633 va_end(ap);
00634 gearman_packet_free(packet);
00635 return ret;
00636 }
00637
00638 arg = va_arg(ap, void *);
00639 }
00640
00641 va_end(ap);
00642
00643 return gearman_packet_pack_header(packet);
00644 }
00645
00646 void gearman_packet_free(gearman_packet_st *packet)
00647 {
00648 if (packet->args != packet->args_buffer && packet->args != NULL)
00649 free(packet->args);
00650
00651 if (packet->options & GEARMAN_PACKET_FREE_DATA && packet->data != NULL)
00652 {
00653 if (packet->gearman->workload_free_fn == NULL)
00654 free((void *)(packet->data));
00655 else
00656 {
00657 packet->gearman->workload_free_fn((void *)(packet->data),
00658 (void *)(packet->gearman->workload_free_context));
00659 }
00660 }
00661
00662 if (!(packet->gearman->options & GEARMAN_DONT_TRACK_PACKETS))
00663 {
00664 if (packet->gearman->packet_list == packet)
00665 packet->gearman->packet_list= packet->next;
00666 if (packet->prev != NULL)
00667 packet->prev->next= packet->next;
00668 if (packet->next != NULL)
00669 packet->next->prev= packet->prev;
00670 packet->gearman->packet_count--;
00671 }
00672
00673 if (packet->options & GEARMAN_PACKET_ALLOCATED)
00674 free(packet);
00675 }
00676
00677 void gearman_packet_free_all(gearman_st *gearman)
00678 {
00679 while (gearman->packet_list != NULL)
00680 gearman_packet_free(gearman->packet_list);
00681 }
00682
00683
00684
00685
00686 void gearman_error_set(gearman_st *gearman, const char *function,
00687 const char *format, ...)
00688 {
00689 size_t length;
00690 char *ptr;
00691 char log_buffer[GEARMAN_MAX_ERROR_SIZE];
00692 va_list arg;
00693
00694 va_start(arg, format);
00695
00696 length= strlen(function);
00697
00698
00699 ptr= memcpy(log_buffer, function, length);
00700 ptr+= length;
00701 ptr[0]= ':';
00702 length++;
00703 ptr++;
00704
00705 length+= (size_t)vsnprintf(ptr, GEARMAN_MAX_ERROR_SIZE - length, format, arg);
00706
00707 if (gearman->log_fn == NULL)
00708 {
00709 if (length >= GEARMAN_MAX_ERROR_SIZE)
00710 length= GEARMAN_MAX_ERROR_SIZE - 1;
00711
00712 memcpy(gearman->last_error, log_buffer, length + 1);
00713 }
00714 else
00715 {
00716 (*(gearman->log_fn))(log_buffer, GEARMAN_VERBOSE_FATAL,
00717 (void *)(gearman)->log_context);
00718 }
00719
00720 va_end(arg);
00721 }
00722
00723 gearman_return_t gearman_parse_servers(const char *servers,
00724 gearman_parse_server_fn *function,
00725 const void *context)
00726 {
00727 const char *ptr= servers;
00728 size_t x;
00729 char host[NI_MAXHOST];
00730 char port[NI_MAXSERV];
00731 gearman_return_t ret;
00732
00733 if (ptr == NULL)
00734 return (*function)(NULL, 0, (void *)context);
00735
00736 while (1)
00737 {
00738 x= 0;
00739
00740 while (*ptr != 0 && *ptr != ',' && *ptr != ':')
00741 {
00742 if (x < (NI_MAXHOST - 1))
00743 host[x++]= *ptr;
00744
00745 ptr++;
00746 }
00747
00748 host[x]= 0;
00749
00750 if (*ptr == ':')
00751 {
00752 ptr++;
00753 x= 0;
00754
00755 while (*ptr != 0 && *ptr != ',')
00756 {
00757 if (x < (NI_MAXSERV - 1))
00758 port[x++]= *ptr;
00759
00760 ptr++;
00761 }
00762
00763 port[x]= 0;
00764 }
00765 else
00766 port[0]= 0;
00767
00768 ret= (*function)(host, (in_port_t)atoi(port), (void *)context);
00769 if (ret != GEARMAN_SUCCESS)
00770 return ret;
00771
00772 if (*ptr == 0)
00773 break;
00774
00775 ptr++;
00776 }
00777
00778 return GEARMAN_SUCCESS;
00779 }