00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00022 gearman_universal_st *gearman_universal_create(gearman_universal_st *universal, const gearman_universal_options_t *options)
00023 {
00024 assert(universal);
00025
00026 {
00027 universal->options.allocated= false;
00028 universal->options.dont_track_packets= false;
00029 universal->options.non_blocking= false;
00030 universal->options.stored_non_blocking= false;
00031 }
00032
00033 if (options)
00034 {
00035 while (*options != GEARMAN_MAX)
00036 {
00040 gearman_universal_add_options(universal, *options);
00041 options++;
00042 }
00043 }
00044
00045 universal->verbose= 0;
00046 universal->con_count= 0;
00047 universal->packet_count= 0;
00048 universal->pfds_size= 0;
00049 universal->sending= 0;
00050 universal->last_errno= 0;
00051 universal->timeout= -1;
00052 universal->con_list= NULL;
00053 universal->packet_list= NULL;
00054 universal->pfds= NULL;
00055 universal->log_fn= NULL;
00056 universal->log_context= NULL;
00057 universal->event_watch_fn= NULL;
00058 universal->event_watch_context= NULL;
00059 universal->workload_malloc_fn= NULL;
00060 universal->workload_malloc_context= NULL;
00061 universal->workload_free_fn= NULL;
00062 universal->workload_free_context= NULL;
00063 universal->last_error[0]= 0;
00064
00065 return universal;
00066 }
00067
00068 gearman_universal_st *gearman_universal_clone(gearman_universal_st *destination, const gearman_universal_st *source)
00069 {
00070 gearman_universal_st *check;
00071 gearman_connection_st *con;
00072
00073 assert(destination);
00074 assert(source);
00075
00076 if (! destination || ! source)
00077 return NULL;
00078
00079 check= gearman_universal_create(destination, NULL);
00080
00081 if (! check)
00082 {
00083 return destination;
00084 }
00085
00086 (void)gearman_universal_set_option(destination, GEARMAN_NON_BLOCKING, source->options.non_blocking);
00087 (void)gearman_universal_set_option(destination, GEARMAN_DONT_TRACK_PACKETS, source->options.dont_track_packets);
00088
00089 destination->timeout= source->timeout;
00090
00091 for (con= source->con_list; con != NULL; con= con->next)
00092 {
00093 if (gearman_connection_clone(destination, NULL, con) == NULL)
00094 {
00095 gearman_universal_free(destination);
00096 return NULL;
00097 }
00098 }
00099
00100
00101
00102
00103 return destination;
00104 }
00105
00106 void gearman_universal_free(gearman_universal_st *universal)
00107 {
00108 gearman_free_all_cons(universal);
00109 gearman_free_all_packets(universal);
00110
00111 if (universal->pfds != NULL)
00112 free(universal->pfds);
00113
00114 if (universal->options.allocated)
00115 {
00116 assert(0);
00117 free(universal);
00118 }
00119 }
00120
00121 gearman_return_t gearman_universal_set_option(gearman_universal_st *universal, gearman_universal_options_t option, bool value)
00122 {
00123 switch (option)
00124 {
00125 case GEARMAN_NON_BLOCKING:
00126 universal->options.non_blocking= value;
00127 break;
00128 case GEARMAN_DONT_TRACK_PACKETS:
00129 universal->options.dont_track_packets= value;
00130 break;
00131 case GEARMAN_MAX:
00132 default:
00133 return GEARMAN_INVALID_COMMAND;
00134 }
00135
00136 return GEARMAN_SUCCESS;
00137 }
00138
00139 int gearman_universal_timeout(gearman_universal_st *universal)
00140 {
00141 return universal->timeout;
00142 }
00143
00144 void gearman_universal_set_timeout(gearman_universal_st *universal, int timeout)
00145 {
00146 universal->timeout= timeout;
00147 }
00148
00149 void gearman_set_log_fn(gearman_universal_st *universal, gearman_log_fn *function,
00150 void *context, gearman_verbose_t verbose)
00151 {
00152 universal->log_fn= function;
00153 universal->log_context= context;
00154 universal->verbose= verbose;
00155 }
00156
00157 void gearman_set_event_watch_fn(gearman_universal_st *universal,
00158 gearman_event_watch_fn *function,
00159 void *context)
00160 {
00161 universal->event_watch_fn= function;
00162 universal->event_watch_context= context;
00163 }
00164
00165 void gearman_set_workload_malloc_fn(gearman_universal_st *universal,
00166 gearman_malloc_fn *function,
00167 void *context)
00168 {
00169 universal->workload_malloc_fn= function;
00170 universal->workload_malloc_context= context;
00171 }
00172
00173 void gearman_set_workload_free_fn(gearman_universal_st *universal,
00174 gearman_free_fn *function,
00175 void *context)
00176 {
00177 universal->workload_free_fn= function;
00178 universal->workload_free_context= context;
00179 }
00180
00181 void gearman_free_all_cons(gearman_universal_st *universal)
00182 {
00183 while (universal->con_list != NULL)
00184 gearman_connection_free(universal->con_list);
00185 }
00186
00187 gearman_return_t gearman_flush_all(gearman_universal_st *universal)
00188 {
00189 gearman_connection_st *con;
00190 gearman_return_t ret;
00191
00192 for (con= universal->con_list; con != NULL; con= con->next)
00193 {
00194 if (con->events & POLLOUT)
00195 continue;
00196
00197 ret= gearman_connection_flush(con);
00198 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00199 return ret;
00200 }
00201
00202 return GEARMAN_SUCCESS;
00203 }
00204
00205 gearman_return_t gearman_wait(gearman_universal_st *universal)
00206 {
00207 gearman_connection_st *con;
00208 struct pollfd *pfds;
00209 nfds_t x;
00210 int ret;
00211 gearman_return_t gret;
00212
00213 if (universal->pfds_size < universal->con_count)
00214 {
00215 pfds= realloc(universal->pfds, universal->con_count * sizeof(struct pollfd));
00216 if (pfds == NULL)
00217 {
00218 gearman_universal_set_error(universal, "gearman_wait", "realloc");
00219 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00220 }
00221
00222 universal->pfds= pfds;
00223 universal->pfds_size= universal->con_count;
00224 }
00225 else
00226 pfds= universal->pfds;
00227
00228 x= 0;
00229 for (con= universal->con_list; con != NULL; con= con->next)
00230 {
00231 if (con->events == 0)
00232 continue;
00233
00234 pfds[x].fd= con->fd;
00235 pfds[x].events= con->events;
00236 pfds[x].revents= 0;
00237 x++;
00238 }
00239
00240 if (x == 0)
00241 {
00242 gearman_universal_set_error(universal, "gearman_wait", "no active file descriptors");
00243 return GEARMAN_NO_ACTIVE_FDS;
00244 }
00245
00246 while (1)
00247 {
00248 ret= poll(pfds, x, universal->timeout);
00249 if (ret == -1)
00250 {
00251 if (errno == EINTR)
00252 continue;
00253
00254 gearman_universal_set_error(universal, "gearman_wait", "poll:%d", errno);
00255 universal->last_errno= errno;
00256 return GEARMAN_ERRNO;
00257 }
00258
00259 break;
00260 }
00261
00262 if (ret == 0)
00263 {
00264 gearman_universal_set_error(universal, "gearman_wait", "timeout reached");
00265 return GEARMAN_TIMEOUT;
00266 }
00267
00268 x= 0;
00269 for (con= universal->con_list; con != NULL; con= con->next)
00270 {
00271 if (con->events == 0)
00272 continue;
00273
00274 gret= gearman_connection_set_revents(con, pfds[x].revents);
00275 if (gret != GEARMAN_SUCCESS)
00276 return gret;
00277
00278 x++;
00279 }
00280
00281 return GEARMAN_SUCCESS;
00282 }
00283
00284 gearman_connection_st *gearman_ready(gearman_universal_st *universal)
00285 {
00286 gearman_connection_st *con;
00287
00288
00289
00290
00291 for (con= universal->con_list; con != NULL; con= con->next)
00292 {
00293 if (con->options.ready)
00294 {
00295 con->options.ready= false;
00296 return con;
00297 }
00298 }
00299
00300 return NULL;
00301 }
00302
00307 static inline void _push_blocking(gearman_universal_st *universal, bool *orig_block_universal)
00308 {
00309 *orig_block_universal= universal->options.non_blocking;
00310 universal->options.non_blocking= false;
00311 }
00312
00313 static inline void _pop_non_blocking(gearman_universal_st *universal, bool orig_block_universal)
00314 {
00315 universal->options.non_blocking= orig_block_universal;
00316 }
00317
00318 gearman_return_t gearman_echo(gearman_universal_st *universal,
00319 const void *workload,
00320 size_t workload_size)
00321 {
00322 gearman_connection_st *con;
00323 gearman_packet_st packet;
00324 gearman_return_t ret;
00325 bool orig_block_universal;
00326
00327 ret= gearman_packet_create_args(universal, &packet, GEARMAN_MAGIC_REQUEST,
00328 GEARMAN_COMMAND_ECHO_REQ,
00329 &workload, &workload_size, 1);
00330 if (ret != GEARMAN_SUCCESS)
00331 {
00332 return ret;
00333 }
00334
00335 _push_blocking(universal, &orig_block_universal);
00336
00337 for (con= universal->con_list; con != NULL; con= con->next)
00338 {
00339 gearman_packet_st *packet_ptr;
00340
00341 ret= gearman_connection_send(con, &packet, true);
00342 if (ret != GEARMAN_SUCCESS)
00343 {
00344 goto exit;
00345 }
00346
00347 packet_ptr= gearman_connection_recv(con, &(con->packet), &ret, true);
00348 if (ret != GEARMAN_SUCCESS)
00349 {
00350 goto exit;
00351 }
00352
00353 assert(packet_ptr);
00354
00355 if (con->packet.data_size != workload_size ||
00356 memcmp(workload, con->packet.data, workload_size))
00357 {
00358 gearman_packet_free(&(con->packet));
00359 gearman_universal_set_error(universal, "gearman_echo", "corruption during echo");
00360
00361 ret= GEARMAN_ECHO_DATA_CORRUPTION;
00362 goto exit;
00363 }
00364
00365 gearman_packet_free(&(con->packet));
00366 }
00367
00368 ret= GEARMAN_SUCCESS;
00369
00370 exit:
00371 gearman_packet_free(&packet);
00372 _pop_non_blocking(universal, orig_block_universal);
00373
00374 return ret;
00375 }
00376
00377 void gearman_free_all_packets(gearman_universal_st *universal)
00378 {
00379 while (universal->packet_list != NULL)
00380 gearman_packet_free(universal->packet_list);
00381 }
00382
00383
00384
00385
00386
00387 void gearman_universal_set_error(gearman_universal_st *universal, const char *function,
00388 const char *format, ...)
00389 {
00390 size_t size;
00391 char *ptr;
00392 char log_buffer[GEARMAN_MAX_ERROR_SIZE];
00393 va_list args;
00394
00395 size= strlen(function);
00396 ptr= memcpy(log_buffer, function, size);
00397 ptr+= size;
00398 ptr[0]= ':';
00399 size++;
00400 ptr++;
00401
00402 va_start(args, format);
00403 size+= (size_t)vsnprintf(ptr, GEARMAN_MAX_ERROR_SIZE - size, format, args);
00404 va_end(args);
00405
00406 if (universal->log_fn == NULL)
00407 {
00408 if (size >= GEARMAN_MAX_ERROR_SIZE)
00409 size= GEARMAN_MAX_ERROR_SIZE - 1;
00410
00411 memcpy(universal->last_error, log_buffer, size + 1);
00412 }
00413 else
00414 {
00415 universal->log_fn(log_buffer, GEARMAN_VERBOSE_FATAL,
00416 (void *)universal->log_context);
00417 }
00418 }