Gearman Developer Documentation

libgearman/connection.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 static void gearman_connection_reset_addrinfo(gearman_connection_st *connection);
00017 
00018 
00026 gearman_connection_st *gearman_connection_create(gearman_universal_st *gearman,
00027                                                  gearman_connection_st *connection,
00028                                                  gearman_connection_options_t *options)
00029 {
00030   if (connection == NULL)
00031   {
00032     connection= malloc(sizeof(gearman_connection_st));
00033     if (connection == NULL)
00034     {
00035       gearman_universal_set_error(gearman, "gearman_connection_create", "malloc");
00036       return NULL;
00037     }
00038 
00039     connection->options.allocated= true;
00040   }
00041   else
00042   {
00043     connection->options.allocated= false;
00044   }
00045 
00046   connection->options.ready= false;
00047   connection->options.packet_in_use= false;
00048   connection->options.external_fd= false;
00049   connection->options.ignore_lost_connection= false;
00050   connection->options.close_after_flush= false;
00051 
00052   if (options)
00053   {
00054     while (*options != GEARMAN_CON_MAX)
00055     {
00056       gearman_connection_set_option(connection, *options, true);
00057       options++;
00058     }
00059   }
00060 
00061 
00062   connection->state= 0;
00063   connection->send_state= 0;
00064   connection->recv_state= 0;
00065   connection->port= 0;
00066   connection->events= 0;
00067   connection->revents= 0;
00068   connection->fd= -1;
00069   connection->created_id= 0;
00070   connection->created_id_next= 0;
00071   connection->send_buffer_size= 0;
00072   connection->send_data_size= 0;
00073   connection->send_data_offset= 0;
00074   connection->recv_buffer_size= 0;
00075   connection->recv_data_size= 0;
00076   connection->recv_data_offset= 0;
00077   connection->universal= gearman;
00078 
00079   if (gearman->con_list != NULL)
00080     gearman->con_list->prev= connection;
00081   connection->next= gearman->con_list;
00082   connection->prev= NULL;
00083   gearman->con_list= connection;
00084   gearman->con_count++;
00085 
00086   connection->context= NULL;
00087   connection->addrinfo= NULL;
00088   connection->addrinfo_next= NULL;
00089   connection->send_buffer_ptr= connection->send_buffer;
00090   connection->recv_packet= NULL;
00091   connection->recv_buffer_ptr= connection->recv_buffer;
00092   connection->protocol_context= NULL;
00093   connection->protocol_context_free_fn= NULL;
00094   connection->packet_pack_fn= gearman_packet_pack;
00095   connection->packet_unpack_fn= gearman_packet_unpack;
00096   connection->host[0]= 0;
00097 
00098   return connection;
00099 }
00100 
00101 gearman_connection_st *gearman_connection_create_args(gearman_universal_st *gearman, gearman_connection_st *connection,
00102                                                       const char *host, in_port_t port)
00103 {
00104   connection= gearman_connection_create(gearman, connection, NULL);
00105   if (connection == NULL)
00106     return NULL;
00107 
00108   gearman_connection_set_host(connection, host, port);
00109 
00110   return connection;
00111 }
00112 
00113 gearman_connection_st *gearman_connection_clone(gearman_universal_st *gearman, gearman_connection_st *connection,
00114                                                 const gearman_connection_st *from)
00115 {
00116   connection= gearman_connection_create(gearman, connection, NULL);
00117 
00118   if (from == NULL || connection == NULL)
00119     return connection;
00120 
00121   connection->options.ready= from->options.ready;
00122   connection->options.packet_in_use= from->options.packet_in_use;
00123   connection->options.external_fd= from->options.external_fd;
00124   connection->options.ignore_lost_connection= from->options.ignore_lost_connection;
00125   connection->options.close_after_flush= from->options.close_after_flush;
00126 
00127   strcpy(connection->host, from->host);
00128   connection->port= from->port;
00129 
00130   return connection;
00131 }
00132 
00133 void gearman_connection_free(gearman_connection_st *connection)
00134 {
00135   if (connection->fd != -1)
00136     gearman_connection_close(connection);
00137 
00138   gearman_connection_reset_addrinfo(connection);
00139 
00140   if (connection->protocol_context != NULL && connection->protocol_context_free_fn != NULL)
00141     connection->protocol_context_free_fn(connection, (void *)connection->protocol_context);
00142 
00143   if (connection->universal->con_list == connection)
00144     connection->universal->con_list= connection->next;
00145   if (connection->prev != NULL)
00146     connection->prev->next= connection->next;
00147   if (connection->next != NULL)
00148     connection->next->prev= connection->prev;
00149   connection->universal->con_count--;
00150 
00151   if (connection->options.packet_in_use)
00152     gearman_packet_free(&(connection->packet));
00153 
00154   if (connection->options.allocated)
00155     free(connection);
00156 }
00157 
00158 gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
00159                                                gearman_connection_options_t options,
00160                                                bool value)
00161 {
00162   switch (options)
00163   {
00164   case GEARMAN_CON_READY:
00165     connection->options.ready= value;
00166     break;
00167   case GEARMAN_CON_PACKET_IN_USE:
00168     connection->options.packet_in_use= value;
00169     break;
00170   case GEARMAN_CON_EXTERNAL_FD:
00171     connection->options.external_fd= value;
00172     break;
00173   case GEARMAN_CON_IGNORE_LOST_CONNECTION:
00174     connection->options.ignore_lost_connection= value;
00175     break;
00176   case GEARMAN_CON_CLOSE_AFTER_FLUSH:
00177     connection->options.close_after_flush= value;
00178     break;
00179   case GEARMAN_CON_MAX:
00180   default:
00181     return GEARMAN_INVALID_COMMAND;
00182   }
00183 
00184   return GEARMAN_SUCCESS;
00185 }
00186 
00190 static gearman_return_t _con_setsockopt(gearman_connection_st *connection);
00191 
00194 /*
00195  * Public Definitions
00196  */
00197 
00198 void gearman_connection_set_host(gearman_connection_st *connection,
00199                                  const char *host,
00200                                  in_port_t port)
00201 {
00202   gearman_connection_reset_addrinfo(connection);
00203 
00204   strncpy(connection->host, host == NULL ? GEARMAN_DEFAULT_TCP_HOST : host,
00205           NI_MAXHOST);
00206   connection->host[NI_MAXHOST - 1]= 0;
00207 
00208   connection->port= (in_port_t)(port == 0 ? GEARMAN_DEFAULT_TCP_PORT : port);
00209 }
00210 
00211 gearman_return_t gearman_connection_set_fd(gearman_connection_st *connection, int fd)
00212 {
00213   gearman_return_t ret;
00214 
00215   connection->options.external_fd= true;
00216   connection->fd= fd;
00217   connection->state= GEARMAN_CON_UNIVERSAL_CONNECTED;
00218 
00219   ret= _con_setsockopt(connection);
00220   if (ret != GEARMAN_SUCCESS)
00221   {
00222     connection->universal->last_errno= errno;
00223     return ret;
00224   }
00225 
00226   return GEARMAN_SUCCESS;
00227 }
00228 
00229 void *gearman_connection_context(const gearman_connection_st *connection)
00230 {
00231   return connection->context;
00232 }
00233 
00234 void gearman_connection_set_context(gearman_connection_st *connection, void *context)
00235 {
00236   connection->context= context;
00237 }
00238 
00239 gearman_return_t gearman_connection_connect(gearman_connection_st *connection)
00240 {
00241   return gearman_connection_flush(connection);
00242 }
00243 
00244 void gearman_connection_close(gearman_connection_st *connection)
00245 {
00246   if (connection->fd == -1)
00247     return;
00248 
00249   if (connection->options.external_fd)
00250     connection->options.external_fd= false;
00251   else
00252     (void)close(connection->fd);
00253 
00254   connection->state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
00255   connection->fd= -1;
00256   connection->events= 0;
00257   connection->revents= 0;
00258 
00259   connection->send_state= GEARMAN_CON_SEND_STATE_NONE;
00260   connection->send_buffer_ptr= connection->send_buffer;
00261   connection->send_buffer_size= 0;
00262   connection->send_data_size= 0;
00263   connection->send_data_offset= 0;
00264 
00265   connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00266   if (connection->recv_packet != NULL)
00267   {
00268     gearman_packet_free(connection->recv_packet);
00269     connection->recv_packet= NULL;
00270   }
00271 
00272   connection->recv_buffer_ptr= connection->recv_buffer;
00273   connection->recv_buffer_size= 0;
00274 }
00275 
00276 void gearman_connection_reset_addrinfo(gearman_connection_st *connection)
00277 {
00278   if (connection->addrinfo != NULL)
00279   {
00280     freeaddrinfo(connection->addrinfo);
00281     connection->addrinfo= NULL;
00282   }
00283 
00284   connection->addrinfo_next= NULL;
00285 }
00286 
00287 gearman_return_t gearman_connection_send(gearman_connection_st *connection,
00288                                          const gearman_packet_st *packet, bool flush)
00289 {
00290   gearman_return_t ret;
00291   size_t send_size;
00292 
00293   switch (connection->send_state)
00294   {
00295   case GEARMAN_CON_SEND_STATE_NONE:
00296     if (! (packet->options.complete))
00297     {
00298       gearman_universal_set_error(connection->universal, "gearman_connection_send",
00299                         "packet not complete");
00300       return GEARMAN_INVALID_PACKET;
00301     }
00302 
00303     /* Pack first part of packet, which is everything but the payload. */
00304     while (1)
00305     {
00306       send_size= connection->packet_pack_fn(packet, connection,
00307                                      connection->send_buffer + connection->send_buffer_size,
00308                                      GEARMAN_SEND_BUFFER_SIZE -
00309                                      connection->send_buffer_size, &ret);
00310       if (ret == GEARMAN_SUCCESS)
00311       {
00312         connection->send_buffer_size+= send_size;
00313         break;
00314       }
00315       else if (ret == GEARMAN_IGNORE_PACKET)
00316         return GEARMAN_SUCCESS;
00317       else if (ret != GEARMAN_FLUSH_DATA)
00318         return ret;
00319 
00320       /* We were asked to flush when the buffer is already flushed! */
00321       if (connection->send_buffer_size == 0)
00322       {
00323         gearman_universal_set_error(connection->universal, "gearman_connection_send",
00324                           "send buffer too small (%u)",
00325                           GEARMAN_SEND_BUFFER_SIZE);
00326         return GEARMAN_SEND_BUFFER_TOO_SMALL;
00327       }
00328 
00329       /* Flush buffer now if first part of packet won't fit in. */
00330       connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH;
00331 
00332     case GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH:
00333       ret= gearman_connection_flush(connection);
00334       if (ret != GEARMAN_SUCCESS)
00335         return ret;
00336     }
00337 
00338     /* Return here if we have no data to send. */
00339     if (packet->data_size == 0)
00340       break;
00341 
00342     /* If there is any room in the buffer, copy in data. */
00343     if (packet->data != NULL &&
00344         (GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size) > 0)
00345     {
00346       connection->send_data_offset= GEARMAN_SEND_BUFFER_SIZE - connection->send_buffer_size;
00347       if (connection->send_data_offset > packet->data_size)
00348         connection->send_data_offset= packet->data_size;
00349 
00350       memcpy(connection->send_buffer + connection->send_buffer_size, packet->data,
00351              connection->send_data_offset);
00352       connection->send_buffer_size+= connection->send_data_offset;
00353 
00354       /* Return if all data fit in the send buffer. */
00355       if (connection->send_data_offset == packet->data_size)
00356       {
00357         connection->send_data_offset= 0;
00358         break;
00359       }
00360     }
00361 
00362     /* Flush buffer now so we can start writing directly from data buffer. */
00363     connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH;
00364 
00365   case GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH:
00366     ret= gearman_connection_flush(connection);
00367     if (ret != GEARMAN_SUCCESS)
00368       return ret;
00369 
00370     connection->send_data_size= packet->data_size;
00371 
00372     /* If this is NULL, then gearman_connection_send_data function will be used. */
00373     if (packet->data == NULL)
00374     {
00375       connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
00376       return GEARMAN_SUCCESS;
00377     }
00378 
00379     /* Copy into the buffer if it fits, otherwise flush from packet buffer. */
00380     connection->send_buffer_size= packet->data_size - connection->send_data_offset;
00381     if (connection->send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
00382     {
00383       memcpy(connection->send_buffer,
00384              (char *)packet->data + connection->send_data_offset,
00385              connection->send_buffer_size);
00386       connection->send_data_size= 0;
00387       connection->send_data_offset= 0;
00388       break;
00389     }
00390 
00391     connection->send_buffer_ptr= (char *)packet->data + connection->send_data_offset;
00392     connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
00393 
00394   case GEARMAN_CON_SEND_UNIVERSAL_FLUSH:
00395   case GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA:
00396     ret= gearman_connection_flush(connection);
00397     if (ret == GEARMAN_SUCCESS && connection->options.close_after_flush)
00398     {
00399       gearman_connection_close(connection);
00400       ret= GEARMAN_LOST_CONNECTION;
00401     }
00402     return ret;
00403 
00404   default:
00405     gearman_universal_set_error(connection->universal, "gearman_connection_send", "unknown state: %u",
00406                       connection->send_state);
00407     return GEARMAN_UNKNOWN_STATE;
00408   }
00409 
00410   if (flush)
00411   {
00412     connection->send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH;
00413     ret= gearman_connection_flush(connection);
00414     if (ret == GEARMAN_SUCCESS && connection->options.close_after_flush)
00415     {
00416       gearman_connection_close(connection);
00417       ret= GEARMAN_LOST_CONNECTION;
00418     }
00419     return ret;
00420   }
00421 
00422   connection->send_state= GEARMAN_CON_SEND_STATE_NONE;
00423   return GEARMAN_SUCCESS;
00424 }
00425 
00426 size_t gearman_connection_send_data(gearman_connection_st *connection, const void *data,
00427                                     size_t data_size, gearman_return_t *ret_ptr)
00428 {
00429   if (connection->send_state != GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
00430   {
00431     gearman_universal_set_error(connection->universal, "gearman_connection_send_data", "not flushing");
00432     return GEARMAN_NOT_FLUSHING;
00433   }
00434 
00435   if (data_size > (connection->send_data_size - connection->send_data_offset))
00436   {
00437     gearman_universal_set_error(connection->universal, "gearman_connection_send_data", "data too large");
00438     return GEARMAN_DATA_TOO_LARGE;
00439   }
00440 
00441   connection->send_buffer_ptr= (char *)data;
00442   connection->send_buffer_size= data_size;
00443 
00444   *ret_ptr= gearman_connection_flush(connection);
00445 
00446   return data_size - connection->send_buffer_size;
00447 }
00448 
00449 gearman_return_t gearman_connection_flush(gearman_connection_st *connection)
00450 {
00451   char port_str[NI_MAXSERV];
00452   struct addrinfo ai;
00453   int ret;
00454   ssize_t write_size;
00455   gearman_return_t gret;
00456 
00457   while (1)
00458   {
00459     switch (connection->state)
00460     {
00461     case GEARMAN_CON_UNIVERSAL_ADDRINFO:
00462       if (connection->addrinfo != NULL)
00463       {
00464         freeaddrinfo(connection->addrinfo);
00465         connection->addrinfo= NULL;
00466       }
00467 
00468       snprintf(port_str, NI_MAXSERV, "%hu", (uint16_t)connection->port);
00469 
00470       memset(&ai, 0, sizeof(struct addrinfo));
00471       ai.ai_socktype= SOCK_STREAM;
00472       ai.ai_protocol= IPPROTO_TCP;
00473 
00474       ret= getaddrinfo(connection->host, port_str, &ai, &(connection->addrinfo));
00475       if (ret != 0)
00476       {
00477         gearman_universal_set_error(connection->universal, "gearman_connection_flush", "getaddrinfo:%s",
00478                           gai_strerror(ret));
00479         return GEARMAN_GETADDRINFO;
00480       }
00481 
00482       connection->addrinfo_next= connection->addrinfo;
00483 
00484     case GEARMAN_CON_UNIVERSAL_CONNECT:
00485       if (connection->fd != -1)
00486         gearman_connection_close(connection);
00487 
00488       if (connection->addrinfo_next == NULL)
00489       {
00490         connection->state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
00491         gearman_universal_set_error(connection->universal, "gearman_connection_flush",
00492                           "could not connect");
00493         return GEARMAN_COULD_NOT_CONNECT;
00494       }
00495 
00496       connection->fd= socket(connection->addrinfo_next->ai_family,
00497                       connection->addrinfo_next->ai_socktype,
00498                       connection->addrinfo_next->ai_protocol);
00499       if (connection->fd == -1)
00500       {
00501         connection->state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
00502         gearman_universal_set_error(connection->universal, "gearman_connection_flush", "socket:%d",
00503                           errno);
00504         connection->universal->last_errno= errno;
00505         return GEARMAN_ERRNO;
00506       }
00507 
00508       gret= _con_setsockopt(connection);
00509       if (gret != GEARMAN_SUCCESS)
00510       {
00511         connection->universal->last_errno= errno;
00512         gearman_connection_close(connection);
00513         return gret;
00514       }
00515 
00516       while (1)
00517       {
00518         ret= connect(connection->fd, connection->addrinfo_next->ai_addr,
00519                      connection->addrinfo_next->ai_addrlen);
00520         if (ret == 0)
00521         {
00522           connection->state= GEARMAN_CON_UNIVERSAL_CONNECTED;
00523           connection->addrinfo_next= NULL;
00524           break;
00525         }
00526 
00527         if (errno == EAGAIN || errno == EINTR)
00528           continue;
00529 
00530         if (errno == EINPROGRESS)
00531         {
00532           connection->state= GEARMAN_CON_UNIVERSAL_CONNECTING;
00533           break;
00534         }
00535 
00536         if (errno == ECONNREFUSED || errno == ENETUNREACH || errno == ETIMEDOUT)
00537         {
00538           connection->state= GEARMAN_CON_UNIVERSAL_CONNECT;
00539           connection->addrinfo_next= connection->addrinfo_next->ai_next;
00540           break;
00541         }
00542 
00543         gearman_universal_set_error(connection->universal, "gearman_connection_flush", "connect:%d",
00544                           errno);
00545         connection->universal->last_errno= errno;
00546         gearman_connection_close(connection);
00547         return GEARMAN_ERRNO;
00548       }
00549 
00550       if (connection->state != GEARMAN_CON_UNIVERSAL_CONNECTING)
00551         break;
00552 
00553     case GEARMAN_CON_UNIVERSAL_CONNECTING:
00554       while (1)
00555       {
00556         if (connection->revents & POLLOUT)
00557         {
00558           connection->state= GEARMAN_CON_UNIVERSAL_CONNECTED;
00559           break;
00560         }
00561         else if (connection->revents & (POLLERR | POLLHUP | POLLNVAL))
00562         {
00563           connection->state= GEARMAN_CON_UNIVERSAL_CONNECT;
00564           connection->addrinfo_next= connection->addrinfo_next->ai_next;
00565           break;
00566         }
00567 
00568         gret= gearman_connection_set_events(connection, POLLOUT);
00569         if (gret != GEARMAN_SUCCESS)
00570           return gret;
00571 
00572         if (gearman_universal_is_non_blocking(connection->universal))
00573         {
00574           connection->state= GEARMAN_CON_UNIVERSAL_CONNECTING;
00575           return GEARMAN_IO_WAIT;
00576         }
00577 
00578         gret= gearman_wait(connection->universal);
00579         if (gret != GEARMAN_SUCCESS)
00580           return gret;
00581       }
00582 
00583       if (connection->state != GEARMAN_CON_UNIVERSAL_CONNECTED)
00584         break;
00585 
00586     case GEARMAN_CON_UNIVERSAL_CONNECTED:
00587       while (connection->send_buffer_size != 0)
00588       {
00589         write_size= write(connection->fd, connection->send_buffer_ptr, connection->send_buffer_size);
00590         if (write_size == 0)
00591         {
00592           if (! (connection->options.ignore_lost_connection))
00593           {
00594             gearman_universal_set_error(connection->universal, "gearman_connection_flush",
00595                               "lost connection to server (EOF)");
00596           }
00597           gearman_connection_close(connection);
00598           return GEARMAN_LOST_CONNECTION;
00599         }
00600         else if (write_size == -1)
00601         {
00602           if (errno == EAGAIN)
00603           {
00604             gret= gearman_connection_set_events(connection, POLLOUT);
00605             if (gret != GEARMAN_SUCCESS)
00606               return gret;
00607 
00608             if (gearman_universal_is_non_blocking(connection->universal))
00609               return GEARMAN_IO_WAIT;
00610 
00611             gret= gearman_wait(connection->universal);
00612             if (gret != GEARMAN_SUCCESS)
00613               return gret;
00614 
00615             continue;
00616           }
00617           else if (errno == EINTR)
00618             continue;
00619           else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
00620           {
00621             if (! (connection->options.ignore_lost_connection))
00622             {
00623               gearman_universal_set_error(connection->universal, "gearman_connection_flush",
00624                                 "lost connection to server (%d)", errno);
00625             }
00626             gearman_connection_close(connection);
00627             return GEARMAN_LOST_CONNECTION;
00628           }
00629 
00630           gearman_universal_set_error(connection->universal, "gearman_connection_flush", "write:%d",
00631                             errno);
00632           connection->universal->last_errno= errno;
00633           gearman_connection_close(connection);
00634           return GEARMAN_ERRNO;
00635         }
00636 
00637         connection->send_buffer_size-= (size_t)write_size;
00638         if (connection->send_state == GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
00639         {
00640           connection->send_data_offset+= (size_t)write_size;
00641           if (connection->send_data_offset == connection->send_data_size)
00642           {
00643             connection->send_data_size= 0;
00644             connection->send_data_offset= 0;
00645             break;
00646           }
00647 
00648           if (connection->send_buffer_size == 0)
00649             return GEARMAN_SUCCESS;
00650         }
00651         else if (connection->send_buffer_size == 0)
00652           break;
00653 
00654         connection->send_buffer_ptr+= write_size;
00655       }
00656 
00657       connection->send_state= GEARMAN_CON_SEND_STATE_NONE;
00658       connection->send_buffer_ptr= connection->send_buffer;
00659       return GEARMAN_SUCCESS;
00660 
00661     default:
00662       gearman_universal_set_error(connection->universal, "gearman_connection_flush", "unknown state: %u",
00663                         connection->state);
00664 
00665       return GEARMAN_UNKNOWN_STATE;
00666     }
00667   }
00668 }
00669 
00670 gearman_packet_st *gearman_connection_recv(gearman_connection_st *connection,
00671                                            gearman_packet_st *packet,
00672                                            gearman_return_t *ret_ptr, bool recv_data)
00673 {
00674   size_t recv_size;
00675 
00676   switch (connection->recv_state)
00677   {
00678   case GEARMAN_CON_RECV_UNIVERSAL_NONE:
00679     if (connection->state != GEARMAN_CON_UNIVERSAL_CONNECTED)
00680     {
00681       gearman_universal_set_error(connection->universal, "gearman_connection_recv", "not connected");
00682       *ret_ptr= GEARMAN_NOT_CONNECTED;
00683       return NULL;
00684     }
00685 
00686     connection->recv_packet= gearman_packet_create(connection->universal, packet);
00687     if (connection->recv_packet == NULL)
00688     {
00689       *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00690       return NULL;
00691     }
00692 
00693     connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_READ;
00694 
00695   case GEARMAN_CON_RECV_UNIVERSAL_READ:
00696     while (1)
00697     {
00698       if (connection->recv_buffer_size > 0)
00699       {
00700         recv_size= connection->packet_unpack_fn(connection->recv_packet, connection,
00701                                          connection->recv_buffer_ptr,
00702                                          connection->recv_buffer_size, ret_ptr);
00703         connection->recv_buffer_ptr+= recv_size;
00704         connection->recv_buffer_size-= recv_size;
00705         if (*ret_ptr == GEARMAN_SUCCESS)
00706           break;
00707         else if (*ret_ptr != GEARMAN_IO_WAIT)
00708         {
00709           gearman_connection_close(connection);
00710           return NULL;
00711         }
00712       }
00713 
00714       /* Shift buffer contents if needed. */
00715       if (connection->recv_buffer_size > 0)
00716         memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
00717       connection->recv_buffer_ptr= connection->recv_buffer;
00718 
00719       recv_size= gearman_connection_read(connection, connection->recv_buffer + connection->recv_buffer_size,
00720                                          GEARMAN_RECV_BUFFER_SIZE - connection->recv_buffer_size,
00721                                          ret_ptr);
00722       if (*ret_ptr != GEARMAN_SUCCESS)
00723         return NULL;
00724 
00725       connection->recv_buffer_size+= recv_size;
00726     }
00727 
00728     if (packet->data_size == 0)
00729     {
00730       connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00731       break;
00732     }
00733 
00734     connection->recv_data_size= packet->data_size;
00735 
00736     if (!recv_data)
00737     {
00738       connection->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
00739       break;
00740     }
00741 
00742     if (packet->universal->workload_malloc_fn == NULL)
00743     {
00744       packet->data= malloc(packet->data_size);
00745     }
00746     else
00747     {
00748       packet->data= packet->universal->workload_malloc_fn(packet->data_size,
00749                                                         (void *)packet->universal->workload_malloc_context);
00750     }
00751     if (packet->data == NULL)
00752     {
00753       *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00754       gearman_connection_close(connection);
00755       return NULL;
00756     }
00757 
00758     packet->options.free_data= true;
00759     connection->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
00760 
00761   case GEARMAN_CON_RECV_STATE_READ_DATA:
00762     while (connection->recv_data_size != 0)
00763     {
00764       (void)gearman_connection_recv_data(connection,
00765                                          ((uint8_t *)(packet->data)) +
00766                                          connection->recv_data_offset,
00767                                          packet->data_size -
00768                                          connection->recv_data_offset, ret_ptr);
00769       if (*ret_ptr != GEARMAN_SUCCESS)
00770         return NULL;
00771     }
00772 
00773     connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00774     break;
00775 
00776   default:
00777     gearman_universal_set_error(connection->universal, "gearman_connection_recv", "unknown state: %u",
00778                       connection->recv_state);
00779     *ret_ptr= GEARMAN_UNKNOWN_STATE;
00780     return NULL;
00781   }
00782 
00783   packet= connection->recv_packet;
00784   connection->recv_packet= NULL;
00785 
00786   return packet;
00787 }
00788 
00789 size_t gearman_connection_recv_data(gearman_connection_st *connection, void *data, size_t data_size,
00790                                     gearman_return_t *ret_ptr)
00791 {
00792   size_t recv_size= 0;
00793 
00794   if (connection->recv_data_size == 0)
00795   {
00796     *ret_ptr= GEARMAN_SUCCESS;
00797     return 0;
00798   }
00799 
00800   if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
00801     data_size= connection->recv_data_size - connection->recv_data_offset;
00802 
00803   if (connection->recv_buffer_size > 0)
00804   {
00805     if (connection->recv_buffer_size < data_size)
00806       recv_size= connection->recv_buffer_size;
00807     else
00808       recv_size= data_size;
00809 
00810     memcpy(data, connection->recv_buffer_ptr, recv_size);
00811     connection->recv_buffer_ptr+= recv_size;
00812     connection->recv_buffer_size-= recv_size;
00813   }
00814 
00815   if (data_size != recv_size)
00816   {
00817     recv_size+= gearman_connection_read(connection, ((uint8_t *)data) + recv_size,
00818                                         data_size - recv_size, ret_ptr);
00819     connection->recv_data_offset+= recv_size;
00820   }
00821   else
00822   {
00823     connection->recv_data_offset+= recv_size;
00824     *ret_ptr= GEARMAN_SUCCESS;
00825   }
00826 
00827   if (connection->recv_data_size == connection->recv_data_offset)
00828   {
00829     connection->recv_data_size= 0;
00830     connection->recv_data_offset= 0;
00831     connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
00832   }
00833 
00834   return recv_size;
00835 }
00836 
00837 size_t gearman_connection_read(gearman_connection_st *connection, void *data, size_t data_size,
00838                                gearman_return_t *ret_ptr)
00839 {
00840   ssize_t read_size;
00841 
00842   while (1)
00843   {
00844     read_size= read(connection->fd, data, data_size);
00845     if (read_size == 0)
00846     {
00847       if (! (connection->options.ignore_lost_connection))
00848       {
00849         gearman_universal_set_error(connection->universal, "gearman_connection_read",
00850                           "lost connection to server (EOF)");
00851       }
00852       gearman_connection_close(connection);
00853       *ret_ptr= GEARMAN_LOST_CONNECTION;
00854       return 0;
00855     }
00856     else if (read_size == -1)
00857     {
00858       if (errno == EAGAIN)
00859       {
00860         *ret_ptr= gearman_connection_set_events(connection, POLLIN);
00861         if (*ret_ptr != GEARMAN_SUCCESS)
00862           return 0;
00863 
00864         if (gearman_universal_is_non_blocking(connection->universal))
00865         {
00866           *ret_ptr= GEARMAN_IO_WAIT;
00867           return 0;
00868         }
00869 
00870         *ret_ptr= gearman_wait(connection->universal);
00871         if (*ret_ptr != GEARMAN_SUCCESS)
00872           return 0;
00873 
00874         continue;
00875       }
00876       else if (errno == EINTR)
00877         continue;
00878       else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
00879       {
00880         if (! (connection->options.ignore_lost_connection))
00881         {
00882           gearman_universal_set_error(connection->universal, "gearman_connection_read",
00883                             "lost connection to server (%d)", errno);
00884         }
00885         *ret_ptr= GEARMAN_LOST_CONNECTION;
00886       }
00887       else
00888       {
00889         gearman_universal_set_error(connection->universal, "gearman_connection_read", "read:%d", errno);
00890         connection->universal->last_errno= errno;
00891         *ret_ptr= GEARMAN_ERRNO;
00892       }
00893 
00894       gearman_connection_close(connection);
00895       return 0;
00896     }
00897 
00898     break;
00899   }
00900 
00901   *ret_ptr= GEARMAN_SUCCESS;
00902   return (size_t)read_size;
00903 }
00904 
00905 gearman_return_t gearman_connection_set_events(gearman_connection_st *connection, short events)
00906 {
00907   gearman_return_t ret;
00908 
00909   if ((connection->events | events) == connection->events)
00910     return GEARMAN_SUCCESS;
00911 
00912   connection->events|= events;
00913 
00914   if (connection->universal->event_watch_fn != NULL)
00915   {
00916     ret= connection->universal->event_watch_fn(connection, connection->events,
00917                                       (void *)connection->universal->event_watch_context);
00918     if (ret != GEARMAN_SUCCESS)
00919     {
00920       gearman_connection_close(connection);
00921       return ret;
00922     }
00923   }
00924 
00925   return GEARMAN_SUCCESS;
00926 }
00927 
00928 gearman_return_t gearman_connection_set_revents(gearman_connection_st *connection, short revents)
00929 {
00930   gearman_return_t ret;
00931 
00932   if (revents != 0)
00933     connection->options.ready= true;
00934 
00935   connection->revents= revents;
00936 
00937   /* Remove external POLLOUT watch if we didn't ask for it. Otherwise we spin
00938     forever until another POLLIN state change. This is much more efficient
00939     than removing POLLOUT on every state change since some external polling
00940     mechanisms need to use a system call to change flags (like Linux epoll). */
00941   if (revents & POLLOUT && !(connection->events & POLLOUT) &&
00942       connection->universal->event_watch_fn != NULL)
00943   {
00944     ret= connection->universal->event_watch_fn(connection, connection->events,
00945                                       (void *)connection->universal->event_watch_context);
00946     if (ret != GEARMAN_SUCCESS)
00947     {
00948       gearman_connection_close(connection);
00949       return ret;
00950     }
00951   }
00952 
00953   connection->events&= (short)~revents;
00954 
00955   return GEARMAN_SUCCESS;
00956 }
00957 
00958 void *gearman_connection_protocol_context(const gearman_connection_st *connection)
00959 {
00960   return connection->protocol_context;
00961 }
00962 
00963 void gearman_connection_set_protocol_context(gearman_connection_st *connection, void *context)
00964 {
00965   connection->protocol_context= context;
00966 }
00967 
00968 void gearman_connection_set_protocol_context_free_fn(gearman_connection_st *connection,
00969                                                      gearman_connection_protocol_context_free_fn *function)
00970 {
00971   connection->protocol_context_free_fn= function;
00972 }
00973 
00974 void gearman_connection_set_packet_pack_fn(gearman_connection_st *connection,
00975                                            gearman_packet_pack_fn *function)
00976 {
00977   connection->packet_pack_fn= function;
00978 }
00979 
00980 void gearman_connection_set_packet_unpack_fn(gearman_connection_st *connection,
00981                                              gearman_packet_unpack_fn *function)
00982 {
00983   connection->packet_unpack_fn= function;
00984 }
00985 
00986 /*
00987  * Static Definitions
00988  */
00989 
00990 static gearman_return_t _con_setsockopt(gearman_connection_st *connection)
00991 {
00992   int ret;
00993   struct linger linger;
00994   struct timeval waittime;
00995 
00996   ret= 1;
00997   ret= setsockopt(connection->fd, IPPROTO_TCP, TCP_NODELAY, &ret,
00998                   (socklen_t)sizeof(int));
00999   if (ret == -1 && errno != EOPNOTSUPP)
01000   {
01001     gearman_universal_set_error(connection->universal, "_con_setsockopt",
01002                                 "setsockopt:TCP_NODELAY:%d", errno);
01003     return GEARMAN_ERRNO;
01004   }
01005 
01006   linger.l_onoff= 1;
01007   linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
01008   ret= setsockopt(connection->fd, SOL_SOCKET, SO_LINGER, &linger,
01009                   (socklen_t)sizeof(struct linger));
01010   if (ret == -1)
01011   {
01012     gearman_universal_set_error(connection->universal, "_con_setsockopt",
01013                                 "setsockopt:SO_LINGER:%d", errno);
01014     return GEARMAN_ERRNO;
01015   }
01016 
01017   waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
01018   waittime.tv_usec= 0;
01019   ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDTIMEO, &waittime,
01020                   (socklen_t)sizeof(struct timeval));
01021   if (ret == -1 && errno != ENOPROTOOPT)
01022   {
01023     gearman_universal_set_error(connection->universal, "_con_setsockopt",
01024                                 "setsockopt:SO_SNDTIMEO:%d", errno);
01025     return GEARMAN_ERRNO;
01026   }
01027 
01028   ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVTIMEO, &waittime,
01029                   (socklen_t)sizeof(struct timeval));
01030   if (ret == -1 && errno != ENOPROTOOPT)
01031   {
01032     gearman_universal_set_error(connection->universal, "_con_setsockopt",
01033                                 "setsockopt:SO_RCVTIMEO:%d", errno);
01034     return GEARMAN_ERRNO;
01035   }
01036 
01037   ret= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
01038   ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDBUF, &ret, (socklen_t)sizeof(int));
01039   if (ret == -1)
01040   {
01041     gearman_universal_set_error(connection->universal, "_con_setsockopt",
01042                                 "setsockopt:SO_SNDBUF:%d", errno);
01043     return GEARMAN_ERRNO;
01044   }
01045 
01046   ret= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
01047   ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVBUF, &ret, (socklen_t)sizeof(int));
01048   if (ret == -1)
01049   {
01050     gearman_universal_set_error(connection->universal, "_con_setsockopt",
01051                                 "setsockopt:SO_RCVBUF:%d", errno);
01052     return GEARMAN_ERRNO;
01053   }
01054 
01055   ret= fcntl(connection->fd, F_GETFL, 0);
01056   if (ret == -1)
01057   {
01058     gearman_universal_set_error(connection->universal, "_con_setsockopt", "fcntl:F_GETFL:%d",
01059                                 errno);
01060     return GEARMAN_ERRNO;
01061   }
01062 
01063   ret= fcntl(connection->fd, F_SETFL, ret | O_NONBLOCK);
01064   if (ret == -1)
01065   {
01066     gearman_universal_set_error(connection->universal, "_con_setsockopt", "fcntl:F_SETFL:%d",
01067                                 errno);
01068     return GEARMAN_ERRNO;
01069   }
01070 
01071   return GEARMAN_SUCCESS;
01072 }