00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static gearman_return_t _con_setsockopt(gearman_con_st *con);
00030
00033
00034
00035
00036
00037 void gearman_con_set_host(gearman_con_st *con, const char *host)
00038 {
00039 gearman_con_reset_addrinfo(con);
00040
00041 strncpy(con->host, host == NULL ? GEARMAN_DEFAULT_TCP_HOST : host,
00042 NI_MAXHOST);
00043 con->host[NI_MAXHOST - 1]= 0;
00044 }
00045
00046 void gearman_con_set_port(gearman_con_st *con, in_port_t port)
00047 {
00048 gearman_con_reset_addrinfo(con);
00049
00050 con->port= (in_port_t)(port == 0 ? GEARMAN_DEFAULT_TCP_PORT : port);
00051 }
00052
00053 gearman_con_options_t gearman_con_options(const gearman_con_st *con)
00054 {
00055 return con->options;
00056 }
00057
00058 void gearman_con_set_options(gearman_con_st *con,
00059 gearman_con_options_t options)
00060 {
00061 con->options= options;
00062 }
00063
00064 void gearman_con_add_options(gearman_con_st *con,
00065 gearman_con_options_t options)
00066 {
00067 con->options|= options;
00068 }
00069
00070 void gearman_con_remove_options(gearman_con_st *con,
00071 gearman_con_options_t options)
00072 {
00073 con->options&= ~options;
00074 }
00075
00076 gearman_return_t gearman_con_set_fd(gearman_con_st *con, int fd)
00077 {
00078 gearman_return_t ret;
00079
00080 con->options|= GEARMAN_CON_EXTERNAL_FD;
00081 con->fd= fd;
00082 con->state= GEARMAN_CON_STATE_CONNECTED;
00083
00084 ret= _con_setsockopt(con);
00085 if (ret != GEARMAN_SUCCESS)
00086 {
00087 con->gearman->last_errno= errno;
00088 return ret;
00089 }
00090
00091 return GEARMAN_SUCCESS;
00092 }
00093
00094 void *gearman_con_context(const gearman_con_st *con)
00095 {
00096 return (void *)con->context;
00097 }
00098
00099 void gearman_con_set_context(gearman_con_st *con, const void *context)
00100 {
00101 con->context= context;
00102 }
00103
00104 gearman_return_t gearman_con_connect(gearman_con_st *con)
00105 {
00106 return gearman_con_flush(con);
00107 }
00108
00109 void gearman_con_close(gearman_con_st *con)
00110 {
00111 if (con->fd == -1)
00112 return;
00113
00114 if (con->options & GEARMAN_CON_EXTERNAL_FD)
00115 con->options&= (gearman_con_options_t)~GEARMAN_CON_EXTERNAL_FD;
00116 else
00117 (void)close(con->fd);
00118
00119 con->state= GEARMAN_CON_STATE_ADDRINFO;
00120 con->fd= -1;
00121 con->events= 0;
00122 con->revents= 0;
00123
00124 con->send_state= GEARMAN_CON_SEND_STATE_NONE;
00125 con->send_buffer_ptr= con->send_buffer;
00126 con->send_buffer_size= 0;
00127 con->send_data_size= 0;
00128 con->send_data_offset= 0;
00129
00130 con->recv_state= GEARMAN_CON_RECV_STATE_NONE;
00131 if (con->recv_packet != NULL)
00132 gearman_packet_free(con->recv_packet);
00133 con->recv_buffer_ptr= con->recv_buffer;
00134 con->recv_buffer_size= 0;
00135 }
00136
00137 void gearman_con_reset_addrinfo(gearman_con_st *con)
00138 {
00139 if (con->addrinfo != NULL)
00140 {
00141 freeaddrinfo(con->addrinfo);
00142 con->addrinfo= NULL;
00143 }
00144
00145 con->addrinfo_next= NULL;
00146 }
00147
00148 gearman_return_t gearman_con_send(gearman_con_st *con,
00149 const gearman_packet_st *packet, bool flush)
00150 {
00151 gearman_return_t ret;
00152 size_t send_size;
00153
00154 switch (con->send_state)
00155 {
00156 case GEARMAN_CON_SEND_STATE_NONE:
00157 if (!(packet->options & GEARMAN_PACKET_COMPLETE))
00158 {
00159 gearman_error_set(con->gearman, "gearman_con_send", "packet not complete");
00160 return GEARMAN_INVALID_PACKET;
00161 }
00162
00163
00164 while (1)
00165 {
00166 send_size= (*con->packet_pack_fn)(packet, con,
00167 con->send_buffer +
00168 con->send_buffer_size,
00169 GEARMAN_SEND_BUFFER_SIZE -
00170 con->send_buffer_size,
00171 &ret);
00172 if (ret == GEARMAN_SUCCESS)
00173 {
00174 con->send_buffer_size+= send_size;
00175 break;
00176 }
00177 else if (ret == GEARMAN_IGNORE_PACKET)
00178 return GEARMAN_SUCCESS;
00179 else if (ret != GEARMAN_FLUSH_DATA)
00180 return ret;
00181
00182
00183 if (con->send_buffer_size == 0)
00184 {
00185 gearman_error_set(con->gearman, "gearman_con_send",
00186 "send buffer too small (%u)",
00187 GEARMAN_SEND_BUFFER_SIZE);
00188 return GEARMAN_SEND_BUFFER_TOO_SMALL;
00189 }
00190
00191
00192 con->send_state= GEARMAN_CON_SEND_STATE_PRE_FLUSH;
00193
00194 case GEARMAN_CON_SEND_STATE_PRE_FLUSH:
00195 ret= gearman_con_flush(con);
00196 if (ret != GEARMAN_SUCCESS)
00197 return ret;
00198 }
00199
00200
00201 if (packet->data_size == 0)
00202 break;
00203
00204
00205 if (packet->data != NULL &&
00206 (GEARMAN_SEND_BUFFER_SIZE - con->send_buffer_size) > 0)
00207 {
00208 con->send_data_offset= GEARMAN_SEND_BUFFER_SIZE - con->send_buffer_size;
00209 if (con->send_data_offset > packet->data_size)
00210 con->send_data_offset= packet->data_size;
00211
00212 memcpy(con->send_buffer + con->send_buffer_size, packet->data,
00213 con->send_data_offset);
00214 con->send_buffer_size+= con->send_data_offset;
00215
00216
00217 if (con->send_data_offset == packet->data_size)
00218 {
00219 con->send_data_offset= 0;
00220 break;
00221 }
00222 }
00223
00224
00225 con->send_state= GEARMAN_CON_SEND_STATE_FORCE_FLUSH;
00226
00227 case GEARMAN_CON_SEND_STATE_FORCE_FLUSH:
00228 ret= gearman_con_flush(con);
00229 if (ret != GEARMAN_SUCCESS)
00230 return ret;
00231
00232 con->send_data_size= packet->data_size;
00233
00234
00235 if (packet->data == NULL)
00236 {
00237 con->send_state= GEARMAN_CON_SEND_STATE_FLUSH_DATA;
00238 return GEARMAN_SUCCESS;
00239 }
00240
00241
00242 con->send_buffer_size= packet->data_size - con->send_data_offset;
00243 if (con->send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
00244 {
00245 memcpy(con->send_buffer,
00246 ((uint8_t *)(packet->data)) + con->send_data_offset,
00247 con->send_buffer_size);
00248 con->send_data_size= 0;
00249 con->send_data_offset= 0;
00250 break;
00251 }
00252
00253 con->send_buffer_ptr= ((uint8_t *)(packet->data)) + con->send_data_offset;
00254 con->send_state= GEARMAN_CON_SEND_STATE_FLUSH_DATA;
00255
00256 case GEARMAN_CON_SEND_STATE_FLUSH:
00257 case GEARMAN_CON_SEND_STATE_FLUSH_DATA:
00258 ret= gearman_con_flush(con);
00259 if (ret == GEARMAN_SUCCESS && con->options & GEARMAN_CON_CLOSE_AFTER_FLUSH)
00260 {
00261 gearman_con_close(con);
00262 ret= GEARMAN_LOST_CONNECTION;
00263 }
00264 return ret;
00265
00266 default:
00267 gearman_error_set(con->gearman, "gearman_con_send", "unknown state: %u",
00268 con->send_state);
00269 return GEARMAN_UNKNOWN_STATE;
00270 }
00271
00272 if (flush)
00273 {
00274 con->send_state= GEARMAN_CON_SEND_STATE_FLUSH;
00275 ret= gearman_con_flush(con);
00276 if (ret == GEARMAN_SUCCESS && con->options & GEARMAN_CON_CLOSE_AFTER_FLUSH)
00277 {
00278 gearman_con_close(con);
00279 ret= GEARMAN_LOST_CONNECTION;
00280 }
00281 return ret;
00282 }
00283
00284 con->send_state= GEARMAN_CON_SEND_STATE_NONE;
00285 return GEARMAN_SUCCESS;
00286 }
00287
00288 size_t gearman_con_send_data(gearman_con_st *con, const void *data,
00289 size_t data_size, gearman_return_t *ret_ptr)
00290 {
00291 if (con->send_state != GEARMAN_CON_SEND_STATE_FLUSH_DATA)
00292 {
00293 gearman_error_set(con->gearman, "gearman_con_send_data", "not flushing");
00294 return GEARMAN_NOT_FLUSHING;
00295 }
00296
00297 if (data_size > (con->send_data_size - con->send_data_offset))
00298 {
00299 gearman_error_set(con->gearman, "gearman_con_send_data", "data too large");
00300 return GEARMAN_DATA_TOO_LARGE;
00301 }
00302
00303 con->send_buffer_ptr= (uint8_t *)data;
00304 con->send_buffer_size= data_size;
00305
00306 *ret_ptr= gearman_con_flush(con);
00307
00308 return data_size - con->send_buffer_size;
00309 }
00310
00311 gearman_return_t gearman_con_flush(gearman_con_st *con)
00312 {
00313 char port_str[NI_MAXSERV];
00314 struct addrinfo ai;
00315 int ret;
00316 ssize_t write_size;
00317 gearman_return_t gret;
00318
00319 while (1)
00320 {
00321 switch (con->state)
00322 {
00323 case GEARMAN_CON_STATE_ADDRINFO:
00324 if (con->addrinfo != NULL)
00325 {
00326 freeaddrinfo(con->addrinfo);
00327 con->addrinfo= NULL;
00328 }
00329
00330 snprintf(port_str, NI_MAXSERV, "%u", con->port);
00331
00332 memset(&ai, 0, sizeof(struct addrinfo));
00333 ai.ai_socktype= SOCK_STREAM;
00334 ai.ai_protocol= IPPROTO_TCP;
00335
00336 ret= getaddrinfo(con->host, port_str, &ai, &(con->addrinfo));
00337 if (ret != 0)
00338 {
00339 gearman_error_set(con->gearman, "gearman_con_flush", "getaddrinfo:%s",
00340 gai_strerror(ret));
00341 return GEARMAN_GETADDRINFO;
00342 }
00343
00344 con->addrinfo_next= con->addrinfo;
00345
00346 case GEARMAN_CON_STATE_CONNECT:
00347 if (con->fd != -1)
00348 gearman_con_close(con);
00349
00350 if (con->addrinfo_next == NULL)
00351 {
00352 con->state= GEARMAN_CON_STATE_ADDRINFO;
00353 gearman_error_set(con->gearman, "gearman_con_flush",
00354 "could not connect");
00355 return GEARMAN_COULD_NOT_CONNECT;
00356 }
00357
00358 con->fd= socket(con->addrinfo_next->ai_family,
00359 con->addrinfo_next->ai_socktype,
00360 con->addrinfo_next->ai_protocol);
00361 if (con->fd == -1)
00362 {
00363 con->state= GEARMAN_CON_STATE_ADDRINFO;
00364 gearman_error_set(con->gearman, "gearman_con_flush", "socket:%d", errno);
00365 con->gearman->last_errno= errno;
00366 return GEARMAN_ERRNO;
00367 }
00368
00369 gret= _con_setsockopt(con);
00370 if (gret != GEARMAN_SUCCESS)
00371 {
00372 con->gearman->last_errno= errno;
00373 gearman_con_close(con);
00374 return gret;
00375 }
00376
00377 while (1)
00378 {
00379 ret= connect(con->fd, con->addrinfo_next->ai_addr,
00380 con->addrinfo_next->ai_addrlen);
00381 if (ret == 0)
00382 {
00383 con->state= GEARMAN_CON_STATE_CONNECTED;
00384 con->addrinfo_next= NULL;
00385 break;
00386 }
00387
00388 if (errno == EAGAIN || errno == EINTR)
00389 continue;
00390
00391 if (errno == EINPROGRESS)
00392 {
00393 con->state= GEARMAN_CON_STATE_CONNECTING;
00394 break;
00395 }
00396
00397 if (errno == ECONNREFUSED || errno == ENETUNREACH || errno == ETIMEDOUT)
00398 {
00399 con->state= GEARMAN_CON_STATE_CONNECT;
00400 con->addrinfo_next= con->addrinfo_next->ai_next;
00401 break;
00402 }
00403
00404 gearman_error_set(con->gearman, "gearman_con_flush", "connect:%d", errno);
00405 con->gearman->last_errno= errno;
00406 gearman_con_close(con);
00407 return GEARMAN_ERRNO;
00408 }
00409
00410 if (con->state != GEARMAN_CON_STATE_CONNECTING)
00411 break;
00412
00413 case GEARMAN_CON_STATE_CONNECTING:
00414 while (1)
00415 {
00416 if (con->revents & POLLOUT)
00417 {
00418 con->state= GEARMAN_CON_STATE_CONNECTED;
00419 break;
00420 }
00421 else if (con->revents & (POLLERR | POLLHUP | POLLNVAL))
00422 {
00423 con->state= GEARMAN_CON_STATE_CONNECT;
00424 con->addrinfo_next= con->addrinfo_next->ai_next;
00425 break;
00426 }
00427
00428 gret= gearman_con_set_events(con, POLLOUT);
00429 if (gret != GEARMAN_SUCCESS)
00430 return gret;
00431
00432 if (con->gearman->options & GEARMAN_NON_BLOCKING)
00433 {
00434 con->state= GEARMAN_CON_STATE_CONNECTING;
00435 return GEARMAN_IO_WAIT;
00436 }
00437
00438 gret= gearman_con_wait(con->gearman);
00439 if (gret != GEARMAN_SUCCESS)
00440 return gret;
00441 }
00442
00443 if (con->state != GEARMAN_CON_STATE_CONNECTED)
00444 break;
00445
00446 case GEARMAN_CON_STATE_CONNECTED:
00447 while (con->send_buffer_size != 0)
00448 {
00449 write_size= write(con->fd, con->send_buffer_ptr, con->send_buffer_size);
00450 if (write_size == 0)
00451 {
00452 if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
00453 {
00454 gearman_error_set(con->gearman, "gearman_con_flush",
00455 "lost connection to server (EOF)");
00456 }
00457 gearman_con_close(con);
00458 return GEARMAN_LOST_CONNECTION;
00459 }
00460 else if (write_size == -1)
00461 {
00462 if (errno == EAGAIN)
00463 {
00464 gret= gearman_con_set_events(con, POLLOUT);
00465 if (gret != GEARMAN_SUCCESS)
00466 return gret;
00467
00468 if (con->gearman->options & GEARMAN_NON_BLOCKING)
00469 return GEARMAN_IO_WAIT;
00470
00471 gret= gearman_con_wait(con->gearman);
00472 if (gret != GEARMAN_SUCCESS)
00473 return gret;
00474
00475 continue;
00476 }
00477 else if (errno == EINTR)
00478 continue;
00479 else if (errno == EPIPE || errno == ECONNRESET)
00480 {
00481 if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
00482 {
00483 gearman_error_set(con->gearman, "gearman_con_flush",
00484 "lost connection to server (%d)", errno);
00485 }
00486 gearman_con_close(con);
00487 return GEARMAN_LOST_CONNECTION;
00488 }
00489
00490 gearman_error_set(con->gearman, "gearman_con_flush", "write:%d", errno);
00491 con->gearman->last_errno= errno;
00492 gearman_con_close(con);
00493 return GEARMAN_ERRNO;
00494 }
00495
00496 con->send_buffer_size-= (size_t)write_size;
00497 if (con->send_state == GEARMAN_CON_SEND_STATE_FLUSH_DATA)
00498 {
00499 con->send_data_offset+= (size_t)write_size;
00500 if (con->send_data_offset == con->send_data_size)
00501 {
00502 con->send_data_size= 0;
00503 con->send_data_offset= 0;
00504 break;
00505 }
00506
00507 if (con->send_buffer_size == 0)
00508 return GEARMAN_SUCCESS;
00509 }
00510 else if (con->send_buffer_size == 0)
00511 break;
00512
00513 con->send_buffer_ptr+= write_size;
00514 }
00515
00516 con->send_state= GEARMAN_CON_SEND_STATE_NONE;
00517 con->send_buffer_ptr= con->send_buffer;
00518 return GEARMAN_SUCCESS;
00519
00520 default:
00521 gearman_error_set(con->gearman, "gearman_con_flush", "unknown state: %u", con->state);
00522
00523 return GEARMAN_UNKNOWN_STATE;
00524 }
00525 }
00526 }
00527
00528 gearman_packet_st *gearman_con_recv(gearman_con_st *con,
00529 gearman_packet_st *packet,
00530 gearman_return_t *ret_ptr, bool recv_data)
00531 {
00532 size_t recv_size;
00533
00534 switch (con->recv_state)
00535 {
00536 case GEARMAN_CON_RECV_STATE_NONE:
00537 if (con->state != GEARMAN_CON_STATE_CONNECTED)
00538 {
00539 gearman_error_set(con->gearman, "gearman_con_recv", "not connected");
00540 *ret_ptr= GEARMAN_NOT_CONNECTED;
00541 return NULL;
00542 }
00543
00544 con->recv_packet= gearman_packet_create(con->gearman, packet);
00545 if (con->recv_packet == NULL)
00546 {
00547 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00548 return NULL;
00549 }
00550
00551 con->recv_state= GEARMAN_CON_RECV_STATE_READ;
00552
00553 case GEARMAN_CON_RECV_STATE_READ:
00554 while (1)
00555 {
00556 if (con->recv_buffer_size > 0)
00557 {
00558 recv_size= (*con->packet_unpack_fn)(con->recv_packet, con,
00559 con->recv_buffer_ptr,
00560 con->recv_buffer_size, ret_ptr);
00561 con->recv_buffer_ptr+= recv_size;
00562 con->recv_buffer_size-= recv_size;
00563 if (*ret_ptr == GEARMAN_SUCCESS)
00564 break;
00565 else if (*ret_ptr != GEARMAN_IO_WAIT)
00566 {
00567 gearman_con_close(con);
00568 return NULL;
00569 }
00570 }
00571
00572
00573 if (con->recv_buffer_size > 0)
00574 memmove(con->recv_buffer, con->recv_buffer_ptr, con->recv_buffer_size);
00575 con->recv_buffer_ptr= con->recv_buffer;
00576
00577 recv_size= gearman_con_read(con, con->recv_buffer + con->recv_buffer_size,
00578 GEARMAN_RECV_BUFFER_SIZE - con->recv_buffer_size,
00579 ret_ptr);
00580 if (*ret_ptr != GEARMAN_SUCCESS)
00581 return NULL;
00582
00583 con->recv_buffer_size+= recv_size;
00584 }
00585
00586 if (packet->data_size == 0)
00587 {
00588 con->recv_state= GEARMAN_CON_RECV_STATE_NONE;
00589 break;
00590 }
00591
00592 con->recv_data_size= packet->data_size;
00593
00594 if (!recv_data)
00595 {
00596 con->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
00597 break;
00598 }
00599
00600 if (packet->gearman->workload_malloc_fn == NULL)
00601 packet->data= malloc(packet->data_size);
00602 else
00603 {
00604 packet->data= packet->gearman->workload_malloc_fn(packet->data_size,
00605 (void *)(packet->gearman->workload_malloc_context));
00606 }
00607 if (packet->data == NULL)
00608 {
00609 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00610 gearman_con_close(con);
00611 return NULL;
00612 }
00613
00614 packet->options|= GEARMAN_PACKET_FREE_DATA;
00615 con->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
00616
00617 case GEARMAN_CON_RECV_STATE_READ_DATA:
00618 while (con->recv_data_size != 0)
00619 {
00620 (void)gearman_con_recv_data(con,
00621 ((uint8_t *)(packet->data)) +
00622 con->recv_data_offset,
00623 packet->data_size -
00624 con->recv_data_offset, ret_ptr);
00625 if (*ret_ptr != GEARMAN_SUCCESS)
00626 return NULL;
00627 }
00628
00629 con->recv_state= GEARMAN_CON_RECV_STATE_NONE;
00630 break;
00631
00632 default:
00633 gearman_error_set(con->gearman, "gearman_con_recv", "unknown state: %u",
00634 con->recv_state);
00635 *ret_ptr= GEARMAN_UNKNOWN_STATE;
00636 return NULL;
00637 }
00638
00639 packet= con->recv_packet;
00640 con->recv_packet= NULL;
00641
00642 return packet;
00643 }
00644
00645 size_t gearman_con_recv_data(gearman_con_st *con, void *data, size_t data_size,
00646 gearman_return_t *ret_ptr)
00647 {
00648 size_t recv_size= 0;
00649
00650 if (con->recv_data_size == 0)
00651 {
00652 *ret_ptr= GEARMAN_SUCCESS;
00653 return 0;
00654 }
00655
00656 if ((con->recv_data_size - con->recv_data_offset) < data_size)
00657 data_size= con->recv_data_size - con->recv_data_offset;
00658
00659 if (con->recv_buffer_size > 0)
00660 {
00661 if (con->recv_buffer_size < data_size)
00662 recv_size= con->recv_buffer_size;
00663 else
00664 recv_size= data_size;
00665
00666 memcpy(data, con->recv_buffer_ptr, recv_size);
00667 con->recv_buffer_ptr+= recv_size;
00668 con->recv_buffer_size-= recv_size;
00669 }
00670
00671 if (data_size != recv_size)
00672 {
00673 recv_size+= gearman_con_read(con, ((uint8_t *)data) + recv_size,
00674 data_size - recv_size, ret_ptr);
00675 con->recv_data_offset+= recv_size;
00676 }
00677 else
00678 {
00679 con->recv_data_offset+= recv_size;
00680 *ret_ptr= GEARMAN_SUCCESS;
00681 }
00682
00683 if (con->recv_data_size == con->recv_data_offset)
00684 {
00685 con->recv_data_size= 0;
00686 con->recv_data_offset= 0;
00687 con->recv_state= GEARMAN_CON_RECV_STATE_NONE;
00688 }
00689
00690 return recv_size;
00691 }
00692
00693 size_t gearman_con_read(gearman_con_st *con, void *data, size_t data_size,
00694 gearman_return_t *ret_ptr)
00695 {
00696 ssize_t read_size;
00697
00698 while (1)
00699 {
00700 read_size= read(con->fd, data, data_size);
00701 if (read_size == 0)
00702 {
00703 if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
00704 {
00705 gearman_error_set(con->gearman, "gearman_con_read",
00706 "lost connection to server (EOF)");
00707 }
00708 gearman_con_close(con);
00709 *ret_ptr= GEARMAN_LOST_CONNECTION;
00710 return 0;
00711 }
00712 else if (read_size == -1)
00713 {
00714 if (errno == EAGAIN)
00715 {
00716 *ret_ptr= gearman_con_set_events(con, POLLIN);
00717 if (*ret_ptr != GEARMAN_SUCCESS)
00718 return 0;
00719
00720 if (con->gearman->options & GEARMAN_NON_BLOCKING)
00721 {
00722 *ret_ptr= GEARMAN_IO_WAIT;
00723 return 0;
00724 }
00725
00726 *ret_ptr= gearman_con_wait(con->gearman);
00727 if (*ret_ptr != GEARMAN_SUCCESS)
00728 return 0;
00729
00730 continue;
00731 }
00732 else if (errno == EINTR)
00733 continue;
00734 else if (errno == EPIPE || errno == ECONNRESET)
00735 {
00736 if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
00737 {
00738 gearman_error_set(con->gearman, "gearman_con_read",
00739 "lost connection to server (%d)", errno);
00740 }
00741 *ret_ptr= GEARMAN_LOST_CONNECTION;
00742 }
00743 else
00744 {
00745 gearman_error_set(con->gearman, "gearman_con_read", "read:%d", errno);
00746 con->gearman->last_errno= errno;
00747 *ret_ptr= GEARMAN_ERRNO;
00748 }
00749
00750 gearman_con_close(con);
00751 return 0;
00752 }
00753
00754 break;
00755 }
00756
00757 *ret_ptr= GEARMAN_SUCCESS;
00758 return (size_t)read_size;
00759 }
00760
00761 gearman_return_t gearman_con_set_events(gearman_con_st *con, short events)
00762 {
00763 gearman_return_t ret;
00764
00765 if ((con->events | events) == con->events)
00766 return GEARMAN_SUCCESS;
00767
00768 con->events|= events;
00769
00770 if (con->gearman->event_watch_fn != NULL)
00771 {
00772 ret= (con->gearman->event_watch_fn)(con, con->events,
00773 (void *)con->gearman->event_watch_context);
00774 if (ret != GEARMAN_SUCCESS)
00775 {
00776 gearman_con_close(con);
00777 return ret;
00778 }
00779 }
00780
00781 return GEARMAN_SUCCESS;
00782 }
00783
00784 gearman_return_t gearman_con_set_revents(gearman_con_st *con, short revents)
00785 {
00786 gearman_return_t ret;
00787
00788 if (revents != 0)
00789 con->options|= GEARMAN_CON_READY;
00790
00791 con->revents= revents;
00792
00793
00794
00795
00796
00797 if (revents & POLLOUT && !(con->events & POLLOUT) &&
00798 con->gearman->event_watch_fn != NULL)
00799 {
00800 ret= (con->gearman->event_watch_fn)(con, con->events,
00801 (void *)con->gearman->event_watch_context);
00802 if (ret != GEARMAN_SUCCESS)
00803 {
00804 gearman_con_close(con);
00805 return ret;
00806 }
00807 }
00808
00809 con->events&= (short)~revents;
00810
00811 return GEARMAN_SUCCESS;
00812 }
00813
00814 void *gearman_con_protocol_context(const gearman_con_st *con)
00815 {
00816 return (void *)con->protocol_context;
00817 }
00818
00819 void gearman_con_set_protocol_context(gearman_con_st *con, const void *context)
00820 {
00821 con->protocol_context= context;
00822 }
00823
00824 void gearman_con_set_protocol_context_free_fn(gearman_con_st *con,
00825 gearman_con_protocol_context_free_fn *function)
00826 {
00827 con->protocol_context_free_fn= function;
00828 }
00829
00830 void gearman_con_set_packet_pack_fn(gearman_con_st *con,
00831 gearman_packet_pack_fn *function)
00832 {
00833 con->packet_pack_fn= function;
00834 }
00835
00836 void gearman_con_set_packet_unpack_fn(gearman_con_st *con,
00837 gearman_packet_unpack_fn *function)
00838 {
00839 con->packet_unpack_fn= function;
00840 }
00841
00842
00843
00844
00845
00846 static gearman_return_t _con_setsockopt(gearman_con_st *con)
00847 {
00848 int ret;
00849 struct linger linger;
00850 struct timeval waittime;
00851
00852 ret= 1;
00853 ret= setsockopt(con->fd, IPPROTO_TCP, TCP_NODELAY, &ret,
00854 (socklen_t)sizeof(int));
00855 if (ret == -1)
00856 {
00857 gearman_error_set(con->gearman, "_con_setsockopt",
00858 "setsockopt:TCP_NODELAY:%d", errno);
00859 return GEARMAN_ERRNO;
00860 }
00861
00862 linger.l_onoff= 1;
00863 linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
00864 ret= setsockopt(con->fd, SOL_SOCKET, SO_LINGER, &linger,
00865 (socklen_t)sizeof(struct linger));
00866 if (ret == -1)
00867 {
00868 gearman_error_set(con->gearman, "_con_setsockopt",
00869 "setsockopt:SO_LINGER:%d", errno);
00870 return GEARMAN_ERRNO;
00871 }
00872
00873 waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
00874 waittime.tv_usec= 0;
00875 ret= setsockopt(con->fd, SOL_SOCKET, SO_SNDTIMEO, &waittime,
00876 (socklen_t)sizeof(struct timeval));
00877 if (ret == -1 && errno != ENOPROTOOPT)
00878 {
00879 gearman_error_set(con->gearman, "_con_setsockopt",
00880 "setsockopt:SO_SNDTIMEO:%d", errno);
00881 return GEARMAN_ERRNO;
00882 }
00883
00884 ret= setsockopt(con->fd, SOL_SOCKET, SO_RCVTIMEO, &waittime,
00885 (socklen_t)sizeof(struct timeval));
00886 if (ret == -1 && errno != ENOPROTOOPT)
00887 {
00888 gearman_error_set(con->gearman, "_con_setsockopt",
00889 "setsockopt:SO_RCVTIMEO:%d", errno);
00890 return GEARMAN_ERRNO;
00891 }
00892
00893 ret= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
00894 ret= setsockopt(con->fd, SOL_SOCKET, SO_SNDBUF, &ret, (socklen_t)sizeof(int));
00895 if (ret == -1)
00896 {
00897 gearman_error_set(con->gearman, "_con_setsockopt",
00898 "setsockopt:SO_SNDBUF:%d", errno);
00899 return GEARMAN_ERRNO;
00900 }
00901
00902 ret= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
00903 ret= setsockopt(con->fd, SOL_SOCKET, SO_RCVBUF, &ret, (socklen_t)sizeof(int));
00904 if (ret == -1)
00905 {
00906 gearman_error_set(con->gearman, "_con_setsockopt",
00907 "setsockopt:SO_RCVBUF:%d", errno);
00908 return GEARMAN_ERRNO;
00909 }
00910
00911 ret= fcntl(con->fd, F_GETFL, 0);
00912 if (ret == -1)
00913 {
00914 gearman_error_set(con->gearman, "_con_setsockopt", "fcntl:F_GETFL:%d", errno);
00915
00916 return GEARMAN_ERRNO;
00917 }
00918
00919 ret= fcntl(con->fd, F_SETFL, ret | O_NONBLOCK);
00920 if (ret == -1)
00921 {
00922 gearman_error_set(con->gearman, "_con_setsockopt", "fcntl:F_SETFL:%d", errno);
00923
00924 return GEARMAN_ERRNO;
00925 }
00926
00927 return GEARMAN_SUCCESS;
00928 }