00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker);
00030
00034 static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
00035
00039 static gearman_return_t _worker_add_server(const char *host, in_port_t port,
00040 void *context);
00041
00045 static gearman_return_t _worker_function_add(gearman_worker_st *worker,
00046 const char *function_name,
00047 uint32_t timeout,
00048 gearman_worker_fn *worker_fn,
00049 const void *context);
00050
00054 static void _worker_function_free(gearman_worker_st *worker,
00055 gearman_worker_function_st *function);
00056
00060 static gearman_job_st *_job_create(gearman_worker_st *worker,
00061 gearman_job_st *job);
00062
00065
00066
00067
00068
00069 gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
00070 {
00071 worker= _worker_allocate(worker);
00072 if (worker == NULL)
00073 return NULL;
00074
00075 worker->gearman= gearman_create(&(worker->gearman_static));
00076 if (worker->gearman == NULL)
00077 {
00078 gearman_worker_free(worker);
00079 return NULL;
00080 }
00081
00082 gearman_set_timeout(worker->gearman, GEARMAN_WORKER_WAIT_TIMEOUT);
00083
00084 if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
00085 {
00086 gearman_worker_free(worker);
00087 return NULL;
00088 }
00089
00090 return worker;
00091 }
00092
00093 gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
00094 const gearman_worker_st *from)
00095 {
00096 if (from == NULL)
00097 return NULL;
00098
00099 worker= _worker_allocate(worker);
00100 if (worker == NULL)
00101 return NULL;
00102
00103 worker->options|= (from->options &
00104 (gearman_worker_options_t)~GEARMAN_WORKER_ALLOCATED);
00105
00106 worker->gearman= gearman_clone(&(worker->gearman_static), from->gearman);
00107 if (worker->gearman == NULL)
00108 {
00109 gearman_worker_free(worker);
00110 return NULL;
00111 }
00112
00113 if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
00114 {
00115 gearman_worker_free(worker);
00116 return NULL;
00117 }
00118
00119 return worker;
00120 }
00121
00122 void gearman_worker_free(gearman_worker_st *worker)
00123 {
00124 if (worker->options & GEARMAN_WORKER_PACKET_INIT)
00125 {
00126 gearman_packet_free(&(worker->grab_job));
00127 gearman_packet_free(&(worker->pre_sleep));
00128 }
00129
00130 if (worker->job != NULL)
00131 gearman_job_free(worker->job);
00132
00133 if (worker->options & GEARMAN_WORKER_WORK_JOB_IN_USE)
00134 gearman_job_free(&(worker->work_job));
00135
00136 if (worker->work_result != NULL)
00137 {
00138 if (worker->gearman->workload_free_fn == NULL)
00139 free(worker->work_result);
00140 else
00141 {
00142 worker->gearman->workload_free_fn(worker->work_result,
00143 (void *)(worker->gearman->workload_free_context));
00144 }
00145 }
00146
00147 while (worker->function_list != NULL)
00148 _worker_function_free(worker, worker->function_list);
00149
00150 gearman_job_free_all(worker);
00151
00152 if (worker->gearman != NULL)
00153 gearman_free(worker->gearman);
00154
00155 if (worker->options & GEARMAN_WORKER_ALLOCATED)
00156 free(worker);
00157 }
00158
00159 const char *gearman_worker_error(gearman_worker_st *worker)
00160 {
00161 return gearman_error(worker->gearman);
00162 }
00163
00164 int gearman_worker_errno(gearman_worker_st *worker)
00165 {
00166 return gearman_errno(worker->gearman);
00167 }
00168
00169 gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker)
00170 {
00171 return worker->options;
00172 }
00173
00174 void gearman_worker_set_options(gearman_worker_st *worker,
00175 gearman_worker_options_t options)
00176 {
00177
00178 gearman_worker_add_options(worker, options);
00179
00180 worker->options= options;
00181 }
00182
00183 void gearman_worker_add_options(gearman_worker_st *worker,
00184 gearman_worker_options_t options)
00185 {
00186 if (options & GEARMAN_WORKER_NON_BLOCKING)
00187 gearman_add_options(worker->gearman, GEARMAN_NON_BLOCKING);
00188
00189 if (options & GEARMAN_WORKER_GRAB_UNIQ)
00190 {
00191 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
00192 (void)gearman_packet_pack_header(&(worker->grab_job));
00193 }
00194
00195 worker->options|= options;
00196 }
00197
00198 void gearman_worker_remove_options(gearman_worker_st *worker,
00199 gearman_worker_options_t options)
00200 {
00201 if (options & GEARMAN_WORKER_NON_BLOCKING)
00202 gearman_remove_options(worker->gearman, GEARMAN_NON_BLOCKING);
00203
00204 if (options & GEARMAN_WORKER_GRAB_UNIQ)
00205 {
00206 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
00207 (void)gearman_packet_pack_header(&(worker->grab_job));
00208 }
00209
00210 worker->options&= ~options;
00211 }
00212
00213 int gearman_worker_timeout(gearman_worker_st *worker)
00214 {
00215 return gearman_timeout(worker->gearman);
00216 }
00217
00218 void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
00219 {
00220 gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
00221 gearman_set_timeout(worker->gearman, timeout);
00222 }
00223
00224 void *gearman_worker_context(const gearman_worker_st *worker)
00225 {
00226 return (void *)(worker->context);
00227 }
00228
00229 void gearman_worker_set_context(gearman_worker_st *worker, const void *context)
00230 {
00231 worker->context= context;
00232 }
00233
00234 void gearman_worker_set_log_fn(gearman_worker_st *worker,
00235 gearman_log_fn *function, const void *context,
00236 gearman_verbose_t verbose)
00237 {
00238 gearman_set_log_fn(worker->gearman, function, context, verbose);
00239 }
00240
00241 void gearman_worker_set_event_watch_fn(gearman_worker_st *worker,
00242 gearman_event_watch_fn *function,
00243 const void *context)
00244 {
00245 gearman_set_event_watch_fn(worker->gearman, function, context);
00246 }
00247
00248 void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
00249 gearman_malloc_fn *function,
00250 const void *context)
00251 {
00252 gearman_set_workload_malloc_fn(worker->gearman, function, context);
00253 }
00254
00255 void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
00256 gearman_free_fn *function,
00257 const void *context)
00258 {
00259 gearman_set_workload_free_fn(worker->gearman, function, context);
00260 }
00261
00262 gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
00263 const char *host, in_port_t port)
00264 {
00265 if (gearman_con_add(worker->gearman, NULL, host, port) == NULL)
00266 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00267
00268 return GEARMAN_SUCCESS;
00269 }
00270
00271 gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
00272 const char *servers)
00273 {
00274 return gearman_parse_servers(servers, _worker_add_server, worker);
00275 }
00276
00277 gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
00278 {
00279 return gearman_con_wait(worker->gearman);
00280 }
00281
00282 gearman_return_t gearman_worker_register(gearman_worker_st *worker,
00283 const char *function_name,
00284 uint32_t timeout)
00285 {
00286 return _worker_function_add(worker, function_name, timeout, NULL, NULL);
00287 }
00288
00289 gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
00290 const char *function_name)
00291 {
00292 gearman_worker_function_st *function;
00293 gearman_return_t ret;
00294
00295 for (function= worker->function_list; function != NULL;
00296 function= function->next)
00297 {
00298 if (!strcmp(function_name, function->function_name))
00299 break;
00300 }
00301
00302 if (function == NULL)
00303 return GEARMAN_SUCCESS;
00304
00305 gearman_packet_free(&(function->packet));
00306
00307 ret= gearman_packet_add(worker->gearman, &(function->packet),
00308 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
00309 (uint8_t *)function_name, strlen(function_name),
00310 NULL);
00311 if (ret != GEARMAN_SUCCESS)
00312 {
00313 function->options&=
00314 (gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_PACKET_IN_USE;
00315 return ret;
00316 }
00317
00318 function->options|= (GEARMAN_WORKER_FUNCTION_CHANGE |
00319 GEARMAN_WORKER_FUNCTION_REMOVE);
00320
00321 worker->options|= GEARMAN_WORKER_CHANGE;
00322
00323 return GEARMAN_SUCCESS;
00324 }
00325
00326 gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
00327 {
00328 gearman_return_t ret;
00329
00330 if (worker->function_list == NULL)
00331 return GEARMAN_SUCCESS;
00332
00333 while (worker->function_list->next != NULL)
00334 _worker_function_free(worker, worker->function_list->next);
00335
00336 gearman_packet_free(&(worker->function_list->packet));
00337
00338 ret= gearman_packet_add(worker->gearman, &(worker->function_list->packet),
00339 GEARMAN_MAGIC_REQUEST,
00340 GEARMAN_COMMAND_RESET_ABILITIES, NULL);
00341 if (ret != GEARMAN_SUCCESS)
00342 {
00343 worker->function_list->options&=
00344 (gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_PACKET_IN_USE;
00345 return ret;
00346 }
00347
00348 worker->function_list->options|= (GEARMAN_WORKER_FUNCTION_CHANGE |
00349 GEARMAN_WORKER_FUNCTION_REMOVE);
00350
00351 worker->options|= GEARMAN_WORKER_CHANGE;
00352
00353 return GEARMAN_SUCCESS;
00354 }
00355
00356 gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
00357 gearman_job_st *job,
00358 gearman_return_t *ret_ptr)
00359 {
00360 gearman_worker_function_st *function;
00361 uint32_t active;
00362
00363 while (1)
00364 {
00365 switch (worker->state)
00366 {
00367 case GEARMAN_WORKER_STATE_START:
00368
00369 if (worker->options & GEARMAN_WORKER_CHANGE)
00370 {
00371 worker->function= worker->function_list;
00372 while (worker->function != NULL)
00373 {
00374 if (!(worker->function->options & GEARMAN_WORKER_FUNCTION_CHANGE))
00375 {
00376 worker->function= worker->function->next;
00377 continue;
00378 }
00379
00380 for (worker->con= worker->gearman->con_list; worker->con != NULL;
00381 worker->con= worker->con->next)
00382 {
00383 if (worker->con->fd == -1)
00384 continue;
00385
00386 case GEARMAN_WORKER_STATE_FUNCTION_SEND:
00387 *ret_ptr= gearman_con_send(worker->con, &(worker->function->packet),
00388 true);
00389 if (*ret_ptr != GEARMAN_SUCCESS)
00390 {
00391 if (*ret_ptr == GEARMAN_IO_WAIT)
00392 worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
00393 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00394 continue;
00395
00396 return NULL;
00397 }
00398 }
00399
00400 if (worker->function->options & GEARMAN_WORKER_FUNCTION_REMOVE)
00401 {
00402 function= worker->function->prev;
00403 _worker_function_free(worker, worker->function);
00404 if (function == NULL)
00405 worker->function= worker->function_list;
00406 else
00407 worker->function= function;
00408 }
00409 else
00410 {
00411 worker->function->options&=
00412 (gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_CHANGE;
00413 worker->function= worker->function->next;
00414 }
00415 }
00416
00417 worker->options&= (gearman_worker_options_t)~GEARMAN_WORKER_CHANGE;
00418 }
00419
00420 if (worker->function_list == NULL)
00421 {
00422 gearman_error_set(worker->gearman, "gearman_worker_grab_job",
00423 "no functions have been registered");
00424 *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
00425 return NULL;
00426 }
00427
00428 for (worker->con= worker->gearman->con_list; worker->con != NULL;
00429 worker->con= worker->con->next)
00430 {
00431
00432 if (worker->con->fd == -1)
00433 {
00434 for (worker->function= worker->function_list;
00435 worker->function != NULL;
00436 worker->function= worker->function->next)
00437 {
00438 case GEARMAN_WORKER_STATE_CONNECT:
00439 *ret_ptr= gearman_con_send(worker->con, &(worker->function->packet),
00440 true);
00441 if (*ret_ptr != GEARMAN_SUCCESS)
00442 {
00443 if (*ret_ptr == GEARMAN_IO_WAIT)
00444 worker->state= GEARMAN_WORKER_STATE_CONNECT;
00445 else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT ||
00446 *ret_ptr == GEARMAN_LOST_CONNECTION)
00447 {
00448 break;
00449 }
00450
00451 return NULL;
00452 }
00453 }
00454
00455 if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
00456 continue;
00457 }
00458
00459 case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
00460 if (worker->con->fd == -1)
00461 continue;
00462
00463 *ret_ptr= gearman_con_send(worker->con, &(worker->grab_job), true);
00464 if (*ret_ptr != GEARMAN_SUCCESS)
00465 {
00466 if (*ret_ptr == GEARMAN_IO_WAIT)
00467 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
00468 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00469 continue;
00470
00471 return NULL;
00472 }
00473
00474 if (worker->job == NULL)
00475 {
00476 worker->job= _job_create(worker, job);
00477 if (worker->job == NULL)
00478 {
00479 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00480 return NULL;
00481 }
00482 }
00483
00484 while (1)
00485 {
00486 case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
00487 (void)gearman_con_recv(worker->con, &(worker->job->assigned), ret_ptr,
00488 true);
00489 if (*ret_ptr != GEARMAN_SUCCESS)
00490 {
00491 if (*ret_ptr == GEARMAN_IO_WAIT)
00492 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
00493 else
00494 {
00495 gearman_job_free(worker->job);
00496 worker->job= NULL;
00497
00498 if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00499 break;
00500 }
00501
00502 return NULL;
00503 }
00504
00505 if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN ||
00506 worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
00507 {
00508 worker->job->options|= GEARMAN_JOB_ASSIGNED_IN_USE;
00509 worker->job->con= worker->con;
00510 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
00511 job= worker->job;
00512 worker->job= NULL;
00513 return job;
00514 }
00515
00516 if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB)
00517 {
00518 gearman_packet_free(&(worker->job->assigned));
00519 break;
00520 }
00521
00522 if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
00523 {
00524 gearman_error_set(worker->gearman, "gearman_worker_grab_job",
00525 "unexpected packet:%s",
00526 gearman_command_info_list[worker->job->assigned.command].name);
00527 gearman_packet_free(&(worker->job->assigned));
00528 gearman_job_free(worker->job);
00529 worker->job= NULL;
00530 *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
00531 return NULL;
00532 }
00533
00534 gearman_packet_free(&(worker->job->assigned));
00535 }
00536 }
00537
00538 case GEARMAN_WORKER_STATE_PRE_SLEEP:
00539 for (worker->con= worker->gearman->con_list; worker->con != NULL;
00540 worker->con= worker->con->next)
00541 {
00542 if (worker->con->fd == -1)
00543 continue;
00544
00545 *ret_ptr= gearman_con_send(worker->con, &(worker->pre_sleep), true);
00546 if (*ret_ptr != GEARMAN_SUCCESS)
00547 {
00548 if (*ret_ptr == GEARMAN_IO_WAIT)
00549 worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
00550 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00551 continue;
00552
00553 return NULL;
00554 }
00555 }
00556
00557 worker->state= GEARMAN_WORKER_STATE_START;
00558
00559
00560 active= 0;
00561 for (worker->con= worker->gearman->con_list; worker->con != NULL;
00562 worker->con= worker->con->next)
00563 {
00564 if (worker->con->fd == -1)
00565 continue;
00566
00567 *ret_ptr= gearman_con_set_events(worker->con, POLLIN);
00568 if (*ret_ptr != GEARMAN_SUCCESS)
00569 return NULL;
00570
00571 active++;
00572 }
00573
00574 if (worker->gearman->options & GEARMAN_NON_BLOCKING)
00575 {
00576 *ret_ptr= GEARMAN_NO_JOBS;
00577 return NULL;
00578 }
00579
00580 if (active == 0)
00581 {
00582 if (worker->gearman->timeout < 0)
00583 usleep(GEARMAN_WORKER_WAIT_TIMEOUT * 1000);
00584 else
00585 {
00586 if (worker->gearman->timeout > 0)
00587 usleep((unsigned int)worker->gearman->timeout * 1000);
00588
00589 if (worker->options & GEARMAN_WORKER_TIMEOUT_RETURN)
00590 {
00591 gearman_error_set(worker->gearman, "gearman_worker_grab_job",
00592 "timeout reached");
00593 *ret_ptr= GEARMAN_TIMEOUT;
00594 return NULL;
00595 }
00596 }
00597 }
00598 else
00599 {
00600 *ret_ptr= gearman_con_wait(worker->gearman);
00601 if (*ret_ptr != GEARMAN_SUCCESS && (*ret_ptr != GEARMAN_TIMEOUT ||
00602 worker->options & GEARMAN_WORKER_TIMEOUT_RETURN))
00603 {
00604 return NULL;
00605 }
00606 }
00607
00608 break;
00609
00610 default:
00611 gearman_error_set(worker->gearman, "gearman_worker_grab_job",
00612 "unknown state: %u", worker->state);
00613 *ret_ptr= GEARMAN_UNKNOWN_STATE;
00614 return NULL;
00615 }
00616 }
00617 }
00618
00619 void gearman_job_free(gearman_job_st *job)
00620 {
00621 if (job->options & GEARMAN_JOB_ASSIGNED_IN_USE)
00622 gearman_packet_free(&(job->assigned));
00623
00624 if (job->options & GEARMAN_JOB_WORK_IN_USE)
00625 gearman_packet_free(&(job->work));
00626
00627 if (job->worker->job_list == job)
00628 job->worker->job_list= job->next;
00629 if (job->prev != NULL)
00630 job->prev->next= job->next;
00631 if (job->next != NULL)
00632 job->next->prev= job->prev;
00633 job->worker->job_count--;
00634
00635 if (job->options & GEARMAN_JOB_ALLOCATED)
00636 free(job);
00637 }
00638
00639 void gearman_job_free_all(gearman_worker_st *worker)
00640 {
00641 while (worker->job_list != NULL)
00642 gearman_job_free(worker->job_list);
00643 }
00644
00645 gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
00646 const char *function_name,
00647 uint32_t timeout,
00648 gearman_worker_fn *worker_fn,
00649 const void *context)
00650 {
00651 if (function_name == NULL)
00652 {
00653 gearman_error_set(worker->gearman, "gearman_worker_add_function",
00654 "function name not given");
00655 return GEARMAN_INVALID_FUNCTION_NAME;
00656 }
00657
00658 if (worker_fn == NULL)
00659 {
00660 gearman_error_set(worker->gearman, "gearman_worker_add_function",
00661 "function not given");
00662 return GEARMAN_INVALID_WORKER_FUNCTION;
00663 }
00664
00665 return _worker_function_add(worker, function_name, timeout, worker_fn,
00666 context);
00667 }
00668
00669 gearman_return_t gearman_worker_work(gearman_worker_st *worker)
00670 {
00671 gearman_return_t ret;
00672
00673 switch (worker->work_state)
00674 {
00675 case GEARMAN_WORKER_WORK_STATE_GRAB_JOB:
00676 (void)gearman_worker_grab_job(worker, &(worker->work_job), &ret);
00677 if (ret != GEARMAN_SUCCESS)
00678 return ret;
00679
00680 for (worker->work_function= worker->function_list;
00681 worker->work_function != NULL;
00682 worker->work_function= worker->work_function->next)
00683 {
00684 if (!strcmp(gearman_job_function_name(&(worker->work_job)),
00685 worker->work_function->function_name))
00686 {
00687 break;
00688 }
00689 }
00690
00691 if (worker->work_function == NULL)
00692 {
00693 gearman_job_free(&(worker->work_job));
00694 gearman_error_set(worker->gearman, "gearman_worker_work",
00695 "function not found");
00696 return GEARMAN_INVALID_FUNCTION_NAME;
00697 }
00698
00699 if (worker->work_function->worker_fn == NULL)
00700 {
00701 gearman_job_free(&(worker->work_job));
00702 gearman_error_set(worker->gearman, "gearman_worker_work",
00703 "no callback function supplied");
00704 return GEARMAN_INVALID_FUNCTION_NAME;
00705 }
00706
00707 worker->options|= GEARMAN_WORKER_WORK_JOB_IN_USE;
00708 worker->work_result_size= 0;
00709
00710 case GEARMAN_WORKER_WORK_STATE_FUNCTION:
00711 worker->work_result= (*(worker->work_function->worker_fn))(
00712 &(worker->work_job),
00713 (void *)(worker->work_function->context),
00714 &(worker->work_result_size), &ret);
00715 if (ret == GEARMAN_WORK_FAIL)
00716 {
00717 ret= gearman_job_send_fail(&(worker->work_job));
00718 if (ret != GEARMAN_SUCCESS)
00719 {
00720 if (ret == GEARMAN_LOST_CONNECTION)
00721 break;
00722
00723 worker->work_state= GEARMAN_WORKER_WORK_STATE_FAIL;
00724 return ret;
00725 }
00726
00727 break;
00728 }
00729
00730 if (ret != GEARMAN_SUCCESS)
00731 {
00732 if (ret == GEARMAN_LOST_CONNECTION)
00733 break;
00734
00735 worker->work_state= GEARMAN_WORKER_WORK_STATE_FUNCTION;
00736 return ret;
00737 }
00738
00739 case GEARMAN_WORKER_WORK_STATE_COMPLETE:
00740 ret= gearman_job_send_complete(&(worker->work_job), worker->work_result,
00741 worker->work_result_size);
00742 if (ret == GEARMAN_IO_WAIT)
00743 {
00744 worker->work_state= GEARMAN_WORKER_WORK_STATE_COMPLETE;
00745 return ret;
00746 }
00747
00748 if (worker->work_result != NULL)
00749 {
00750 if (worker->gearman->workload_free_fn == NULL)
00751 free(worker->work_result);
00752 else
00753 {
00754 worker->gearman->workload_free_fn(worker->work_result,
00755 (void *)(worker->gearman->workload_free_context));
00756 }
00757 worker->work_result= NULL;
00758 }
00759
00760 if (ret != GEARMAN_SUCCESS)
00761 {
00762 if (ret == GEARMAN_LOST_CONNECTION)
00763 break;
00764
00765 return ret;
00766 }
00767
00768 break;
00769
00770 case GEARMAN_WORKER_WORK_STATE_FAIL:
00771 ret= gearman_job_send_fail(&(worker->work_job));
00772 if (ret != GEARMAN_SUCCESS)
00773 {
00774 if (ret == GEARMAN_LOST_CONNECTION)
00775 break;
00776
00777 return ret;
00778 }
00779
00780 break;
00781
00782 default:
00783 gearman_error_set(worker->gearman, "gearman_worker_work",
00784 "unknown state: %u", worker->work_state);
00785 return GEARMAN_UNKNOWN_STATE;
00786 }
00787
00788 gearman_job_free(&(worker->work_job));
00789 worker->options&= (gearman_worker_options_t)~GEARMAN_WORKER_WORK_JOB_IN_USE;
00790 worker->work_state= GEARMAN_WORKER_WORK_STATE_GRAB_JOB;
00791 return GEARMAN_SUCCESS;
00792 }
00793
00794 gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
00795 const void *workload,
00796 size_t workload_size)
00797 {
00798 return gearman_con_echo(worker->gearman, workload, workload_size);
00799 }
00800
00801
00802
00803
00804
00805 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker)
00806 {
00807 if (worker == NULL)
00808 {
00809 worker= malloc(sizeof(gearman_worker_st));
00810 if (worker == NULL)
00811 return NULL;
00812
00813 worker->options= GEARMAN_WORKER_ALLOCATED;
00814 }
00815 else
00816 worker->options= 0;
00817
00818 worker->state= 0;
00819 worker->work_state= 0;
00820 worker->function_count= 0;
00821 worker->job_count= 0;
00822 worker->work_result_size= 0;
00823 worker->gearman= NULL;
00824 worker->con= NULL;
00825 worker->job= NULL;
00826 worker->job_list= NULL;
00827 worker->function= NULL;
00828 worker->function_list= NULL;
00829 worker->work_function= NULL;
00830 worker->work_result= NULL;
00831
00832 return worker;
00833 }
00834
00835 static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
00836 {
00837 gearman_return_t ret;
00838
00839 ret= gearman_packet_add(worker->gearman, &(worker->grab_job),
00840 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB,
00841 NULL);
00842 if (ret != GEARMAN_SUCCESS)
00843 return ret;
00844
00845 ret= gearman_packet_add(worker->gearman, &(worker->pre_sleep),
00846 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
00847 NULL);
00848 if (ret != GEARMAN_SUCCESS)
00849 {
00850 gearman_packet_free(&(worker->grab_job));
00851 return ret;
00852 }
00853
00854 worker->options|= GEARMAN_WORKER_PACKET_INIT;
00855
00856 return GEARMAN_SUCCESS;
00857 }
00858
00859 static gearman_return_t _worker_add_server(const char *host, in_port_t port,
00860 void *context)
00861 {
00862 return gearman_worker_add_server((gearman_worker_st *)context, host, port);
00863 }
00864
00865 static gearman_return_t _worker_function_add(gearman_worker_st *worker,
00866 const char *function_name,
00867 uint32_t timeout,
00868 gearman_worker_fn *worker_fn,
00869 const void *context)
00870 {
00871 gearman_worker_function_st *function;
00872 gearman_return_t ret;
00873 char timeout_buffer[11];
00874
00875 function= malloc(sizeof(gearman_worker_function_st));
00876 if (function == NULL)
00877 {
00878 gearman_error_set(worker->gearman, "_worker_function_add", "malloc");
00879 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00880 }
00881
00882 function->options= (GEARMAN_WORKER_FUNCTION_PACKET_IN_USE |
00883 GEARMAN_WORKER_FUNCTION_CHANGE);
00884
00885 function->function_name= strdup(function_name);
00886 if (function->function_name == NULL)
00887 {
00888 free(function);
00889 gearman_error_set(worker->gearman, "gearman_worker_add_function", "strdup");
00890 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00891 }
00892
00893 function->worker_fn= worker_fn;
00894 function->context= context;
00895
00896 if (timeout > 0)
00897 {
00898 snprintf(timeout_buffer, 11, "%u", timeout);
00899 ret= gearman_packet_add(worker->gearman, &(function->packet),
00900 GEARMAN_MAGIC_REQUEST,
00901 GEARMAN_COMMAND_CAN_DO_TIMEOUT,
00902 (uint8_t *)function_name,
00903 strlen(function_name) + 1,
00904 (uint8_t *)timeout_buffer,
00905 strlen(timeout_buffer), NULL);
00906 }
00907 else
00908 {
00909 ret= gearman_packet_add(worker->gearman, &(function->packet),
00910 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
00911 (uint8_t *)function_name, strlen(function_name),
00912 NULL);
00913 }
00914 if (ret != GEARMAN_SUCCESS)
00915 {
00916 free(function->function_name);
00917 free(function);
00918 return ret;
00919 }
00920
00921 if (worker->function_list != NULL)
00922 worker->function_list->prev= function;
00923 function->next= worker->function_list;
00924 function->prev= NULL;
00925 worker->function_list= function;
00926 worker->function_count++;
00927
00928 worker->options|= GEARMAN_WORKER_CHANGE;
00929
00930 return GEARMAN_SUCCESS;
00931 }
00932
00933 static void _worker_function_free(gearman_worker_st *worker,
00934 gearman_worker_function_st *function)
00935 {
00936 if (worker->function_list == function)
00937 worker->function_list= function->next;
00938 if (function->prev != NULL)
00939 function->prev->next= function->next;
00940 if (function->next != NULL)
00941 function->next->prev= function->prev;
00942 worker->function_count--;
00943
00944 if (function->options & GEARMAN_WORKER_FUNCTION_PACKET_IN_USE)
00945 gearman_packet_free(&(function->packet));
00946
00947 free(function->function_name);
00948 free(function);
00949 }
00950
00951 static gearman_job_st *_job_create(gearman_worker_st *worker,
00952 gearman_job_st *job)
00953 {
00954 if (job == NULL)
00955 {
00956 job= malloc(sizeof(gearman_job_st));
00957 if (job == NULL)
00958 {
00959 gearman_error_set(worker->gearman, "_job_create", "malloc");
00960 return NULL;
00961 }
00962
00963 job->options= GEARMAN_JOB_ALLOCATED;
00964 }
00965 else
00966 job->options= 0;
00967
00968 job->worker= worker;
00969
00970 if (worker->job_list != NULL)
00971 worker->job_list->prev= job;
00972 job->next= worker->job_list;
00973 job->prev= NULL;
00974 worker->job_list= job;
00975 worker->job_count++;
00976
00977 job->con= NULL;
00978
00979 return job;
00980 }