00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static gearman_client_st *_client_allocate(gearman_client_st *client);
00030
00034 static gearman_return_t _client_add_server(const char *host, in_port_t port,
00035 void *context);
00036
00040 static gearman_task_st *_client_add_task(gearman_client_st *client,
00041 gearman_task_st *task,
00042 const void *context,
00043 gearman_command_t command,
00044 const char *function_name,
00045 const char *unique,
00046 const void *workload,
00047 size_t workload_size,
00048 gearman_return_t *ret_ptr);
00049
00053 static gearman_return_t _client_run_task(gearman_client_st *client,
00054 gearman_task_st *task);
00055
00059 static void *_client_do(gearman_client_st *client, gearman_command_t command,
00060 const char *function_name, const char *unique,
00061 const void *workload, size_t workload_size,
00062 size_t *result_size, gearman_return_t *ret_ptr);
00063
00067 static gearman_return_t _client_do_background(gearman_client_st *client,
00068 gearman_command_t command,
00069 const char *function_name,
00070 const char *unique,
00071 const void *workload,
00072 size_t workload_size,
00073 char *job_handle);
00074
00078 static gearman_return_t _client_do_data(gearman_task_st *task);
00079
00083 static gearman_return_t _client_do_status(gearman_task_st *task);
00084
00088 static gearman_return_t _client_do_fail(gearman_task_st *task);
00089
00099 static gearman_task_st *_task_create(gearman_client_st *client,
00100 gearman_task_st *task);
00101
00104
00105
00106
00107
00108 gearman_client_st *gearman_client_create(gearman_client_st *client)
00109 {
00110 client= _client_allocate(client);
00111 if (client == NULL)
00112 return NULL;
00113
00114 client->gearman= gearman_create(&(client->gearman_static));
00115 if (client->gearman == NULL)
00116 {
00117 gearman_client_free(client);
00118 return NULL;
00119 }
00120
00121 return client;
00122 }
00123
00124 gearman_client_st *gearman_client_clone(gearman_client_st *client,
00125 const gearman_client_st *from)
00126 {
00127 if (from == NULL)
00128 return NULL;
00129
00130 client= _client_allocate(client);
00131 if (client == NULL)
00132 return NULL;
00133
00134 client->options|= (from->options &
00135 (gearman_client_options_t)~GEARMAN_CLIENT_ALLOCATED);
00136
00137 client->gearman= gearman_clone(&(client->gearman_static), from->gearman);
00138 if (client->gearman == NULL)
00139 {
00140 gearman_client_free(client);
00141 return NULL;
00142 }
00143
00144 return client;
00145 }
00146
00147 void gearman_client_free(gearman_client_st *client)
00148 {
00149 if (client->options & GEARMAN_CLIENT_TASK_IN_USE)
00150 gearman_task_free(&(client->do_task));
00151
00152 gearman_client_task_free_all(client);
00153
00154 if (client->gearman != NULL)
00155 gearman_free(client->gearman);
00156
00157 if (client->options & GEARMAN_CLIENT_ALLOCATED)
00158 free(client);
00159 }
00160
00161 const char *gearman_client_error(const gearman_client_st *client)
00162 {
00163 return gearman_error(client->gearman);
00164 }
00165
00166 int gearman_client_errno(const gearman_client_st *client)
00167 {
00168 return gearman_errno(client->gearman);
00169 }
00170
00171 gearman_client_options_t gearman_client_options(const gearman_client_st *client)
00172 {
00173 return client->options;
00174 }
00175
00176 void gearman_client_set_options(gearman_client_st *client,
00177 gearman_client_options_t options)
00178 {
00179
00180 gearman_client_add_options(client, options);
00181
00182 client->options= options;
00183 }
00184
00185 void gearman_client_add_options(gearman_client_st *client,
00186 gearman_client_options_t options)
00187 {
00188 if (options & GEARMAN_CLIENT_NON_BLOCKING)
00189 gearman_add_options(client->gearman, GEARMAN_NON_BLOCKING);
00190
00191 client->options|= options;
00192 }
00193
00194 void gearman_client_remove_options(gearman_client_st *client,
00195 gearman_client_options_t options)
00196 {
00197 if (options & GEARMAN_CLIENT_NON_BLOCKING)
00198 gearman_remove_options(client->gearman, GEARMAN_NON_BLOCKING);
00199
00200 client->options&= ~options;
00201 }
00202
00203 int gearman_client_timeout(gearman_client_st *client)
00204 {
00205 return gearman_timeout(client->gearman);
00206 }
00207
00208 void gearman_client_set_timeout(gearman_client_st *client, int timeout)
00209 {
00210 gearman_set_timeout(client->gearman, timeout);
00211 }
00212
00213 void *gearman_client_context(const gearman_client_st *client)
00214 {
00215 return (void *)(client->context);
00216 }
00217
00218 void gearman_client_set_context(gearman_client_st *client, const void *context)
00219 {
00220 client->context= context;
00221 }
00222
00223 void gearman_client_set_log_fn(gearman_client_st *client,
00224 gearman_log_fn *function, const void *context,
00225 gearman_verbose_t verbose)
00226 {
00227 gearman_set_log_fn(client->gearman, function, context, verbose);
00228 }
00229
00230 void gearman_client_set_event_watch_fn(gearman_client_st *client,
00231 gearman_event_watch_fn *function,
00232 const void *context)
00233 {
00234 gearman_set_event_watch_fn(client->gearman, function, context);
00235 }
00236
00237 void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
00238 gearman_malloc_fn *function,
00239 const void *context)
00240 {
00241 gearman_set_workload_malloc_fn(client->gearman, function, context);
00242 }
00243
00244 void gearman_client_set_workload_free_fn(gearman_client_st *client,
00245 gearman_free_fn *function,
00246 const void *context)
00247 {
00248 gearman_set_workload_free_fn(client->gearman, function, context);
00249 }
00250
00251 gearman_return_t gearman_client_add_server(gearman_client_st *client,
00252 const char *host, in_port_t port)
00253 {
00254 if (gearman_con_add(client->gearman, NULL, host, port) == NULL)
00255 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00256
00257 return GEARMAN_SUCCESS;
00258 }
00259
00260 gearman_return_t gearman_client_add_servers(gearman_client_st *client,
00261 const char *servers)
00262 {
00263 return gearman_parse_servers(servers, _client_add_server, client);
00264 }
00265
00266 void gearman_client_remove_servers(gearman_client_st *client)
00267 {
00268 gearman_con_free_all(client->gearman);
00269 }
00270
00271 gearman_return_t gearman_client_wait(gearman_client_st *client)
00272 {
00273 return gearman_con_wait(client->gearman);
00274 }
00275
00276 void *gearman_client_do(gearman_client_st *client, const char *function_name,
00277 const char *unique, const void *workload,
00278 size_t workload_size, size_t *result_size,
00279 gearman_return_t *ret_ptr)
00280 {
00281 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB, function_name, unique,
00282 workload, workload_size, result_size, ret_ptr);
00283 }
00284
00285 void *gearman_client_do_high(gearman_client_st *client,
00286 const char *function_name, const char *unique,
00287 const void *workload, size_t workload_size,
00288 size_t *result_size, gearman_return_t *ret_ptr)
00289 {
00290 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH, function_name,
00291 unique, workload, workload_size, result_size, ret_ptr);
00292 }
00293
00294 void *gearman_client_do_low(gearman_client_st *client,
00295 const char *function_name, const char *unique,
00296 const void *workload, size_t workload_size,
00297 size_t *result_size, gearman_return_t *ret_ptr)
00298 {
00299 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW, function_name,
00300 unique, workload, workload_size, result_size, ret_ptr);
00301 }
00302
00303 const char *gearman_client_do_job_handle(const gearman_client_st *client)
00304 {
00305 return client->do_task.job_handle;
00306 }
00307
00308 void gearman_client_do_status(gearman_client_st *client, uint32_t *numerator,
00309 uint32_t *denominator)
00310 {
00311 if (numerator != NULL)
00312 *numerator= client->do_task.numerator;
00313
00314 if (denominator != NULL)
00315 *denominator= client->do_task.denominator;
00316 }
00317
00318 gearman_return_t gearman_client_do_background(gearman_client_st *client,
00319 const char *function_name,
00320 const char *unique,
00321 const void *workload,
00322 size_t workload_size,
00323 char *job_handle)
00324 {
00325 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
00326 function_name, unique, workload, workload_size,
00327 job_handle);
00328 }
00329
00330 gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
00331 const char *function_name,
00332 const char *unique,
00333 const void *workload,
00334 size_t workload_size,
00335 char *job_handle)
00336 {
00337 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
00338 function_name, unique, workload, workload_size,
00339 job_handle);
00340 }
00341
00342 gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
00343 const char *function_name,
00344 const char *unique,
00345 const void *workload,
00346 size_t workload_size,
00347 char *job_handle)
00348 {
00349 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
00350 function_name, unique, workload, workload_size,
00351 job_handle);
00352 }
00353
00354 gearman_return_t gearman_client_job_status(gearman_client_st *client,
00355 const char *job_handle,
00356 bool *is_known, bool *is_running,
00357 uint32_t *numerator,
00358 uint32_t *denominator)
00359 {
00360 gearman_return_t ret;
00361
00362 if (!(client->options & GEARMAN_CLIENT_TASK_IN_USE))
00363 {
00364 (void)gearman_client_add_task_status(client, &(client->do_task), client,
00365 job_handle, &ret);
00366 if (ret != GEARMAN_SUCCESS)
00367 return ret;
00368
00369 client->options|= GEARMAN_CLIENT_TASK_IN_USE;
00370 }
00371
00372 gearman_client_clear_fn(client);
00373
00374 ret= gearman_client_run_tasks(client);
00375 if (ret != GEARMAN_IO_WAIT)
00376 {
00377 if (is_known != NULL)
00378 *is_known= client->do_task.is_known;
00379 if (is_running != NULL)
00380 *is_running= client->do_task.is_running;
00381 if (numerator != NULL)
00382 *numerator= client->do_task.numerator;
00383 if (denominator != NULL)
00384 *denominator= client->do_task.denominator;
00385
00386 gearman_task_free(&(client->do_task));
00387 client->options&= (gearman_client_options_t)~GEARMAN_CLIENT_TASK_IN_USE;
00388 }
00389
00390 return ret;
00391 }
00392
00393 gearman_return_t gearman_client_echo(gearman_client_st *client,
00394 const void *workload,
00395 size_t workload_size)
00396 {
00397 return gearman_con_echo(client->gearman, workload, workload_size);
00398 }
00399
00400 void gearman_task_free(gearman_task_st *task)
00401 {
00402 if (task->options & GEARMAN_TASK_SEND_IN_USE)
00403 gearman_packet_free(&(task->send));
00404
00405 if (task->context != NULL && task->client->task_context_free_fn != NULL)
00406 (*(task->client->task_context_free_fn))(task, (void *)(task->context));
00407
00408 if (task->client->task_list == task)
00409 task->client->task_list= task->next;
00410 if (task->prev != NULL)
00411 task->prev->next= task->next;
00412 if (task->next != NULL)
00413 task->next->prev= task->prev;
00414 task->client->task_count--;
00415
00416 if (task->options & GEARMAN_TASK_ALLOCATED)
00417 free(task);
00418 }
00419
00420 void gearman_client_task_free_all(gearman_client_st *client)
00421 {
00422 while (client->task_list != NULL)
00423 gearman_task_free(client->task_list);
00424 }
00425
00426 void gearman_client_set_task_context_free_fn(gearman_client_st *client,
00427 gearman_task_context_free_fn *function)
00428 {
00429 client->task_context_free_fn= function;
00430 }
00431
00432 gearman_task_st *gearman_client_add_task(gearman_client_st *client,
00433 gearman_task_st *task,
00434 const void *context,
00435 const char *function_name,
00436 const char *unique,
00437 const void *workload,
00438 size_t workload_size,
00439 gearman_return_t *ret_ptr)
00440 {
00441 return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB,
00442 function_name, unique, workload, workload_size,
00443 ret_ptr);
00444 }
00445
00446 gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
00447 gearman_task_st *task,
00448 const void *context,
00449 const char *function_name,
00450 const char *unique,
00451 const void *workload,
00452 size_t workload_size,
00453 gearman_return_t *ret_ptr)
00454 {
00455 return _client_add_task(client, task, context,
00456 GEARMAN_COMMAND_SUBMIT_JOB_HIGH, function_name,
00457 unique, workload, workload_size, ret_ptr);
00458 }
00459
00460 gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
00461 gearman_task_st *task,
00462 const void *context,
00463 const char *function_name,
00464 const char *unique,
00465 const void *workload,
00466 size_t workload_size,
00467 gearman_return_t *ret_ptr)
00468 {
00469 return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
00470 function_name, unique, workload, workload_size,
00471 ret_ptr);
00472 }
00473
00474 gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
00475 gearman_task_st *task,
00476 const void *context,
00477 const char *function_name,
00478 const char *unique,
00479 const void *workload,
00480 size_t workload_size,
00481 gearman_return_t *ret_ptr)
00482 {
00483 return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
00484 function_name, unique, workload, workload_size,
00485 ret_ptr);
00486 }
00487
00488 gearman_task_st *
00489 gearman_client_add_task_high_background(gearman_client_st *client,
00490 gearman_task_st *task,
00491 const void *context,
00492 const char *function_name,
00493 const char *unique,
00494 const void *workload,
00495 size_t workload_size,
00496 gearman_return_t *ret_ptr)
00497 {
00498 return _client_add_task(client, task, context,
00499 GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG, function_name,
00500 unique, workload, workload_size, ret_ptr);
00501 }
00502
00503 gearman_task_st *
00504 gearman_client_add_task_low_background(gearman_client_st *client,
00505 gearman_task_st *task,
00506 const void *context,
00507 const char *function_name,
00508 const char *unique,
00509 const void *workload,
00510 size_t workload_size,
00511 gearman_return_t *ret_ptr)
00512 {
00513 return _client_add_task(client, task, context,
00514 GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG, function_name,
00515 unique, workload, workload_size, ret_ptr);
00516 }
00517
00518 gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
00519 gearman_task_st *task,
00520 const void *context,
00521 const char *job_handle,
00522 gearman_return_t *ret_ptr)
00523 {
00524 task= _task_create(client, task);
00525 if (task == NULL)
00526 {
00527 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00528 return NULL;
00529 }
00530
00531 task->context= context;
00532 snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
00533
00534 *ret_ptr= gearman_packet_add(client->gearman, &(task->send),
00535 GEARMAN_MAGIC_REQUEST,
00536 GEARMAN_COMMAND_GET_STATUS,
00537 (uint8_t *)job_handle, strlen(job_handle), NULL);
00538 if (*ret_ptr == GEARMAN_SUCCESS)
00539 {
00540 client->new_tasks++;
00541 client->running_tasks++;
00542 task->options|= GEARMAN_TASK_SEND_IN_USE;
00543 }
00544
00545 return task;
00546 }
00547
00548 void gearman_client_set_workload_fn(gearman_client_st *client,
00549 gearman_workload_fn *function)
00550 {
00551 client->workload_fn= function;
00552 }
00553
00554 void gearman_client_set_created_fn(gearman_client_st *client,
00555 gearman_created_fn *function)
00556 {
00557 client->created_fn= function;
00558 }
00559
00560 void gearman_client_set_data_fn(gearman_client_st *client,
00561 gearman_data_fn *function)
00562 {
00563 client->data_fn= function;
00564 }
00565
00566 void gearman_client_set_warning_fn(gearman_client_st *client,
00567 gearman_warning_fn *function)
00568 {
00569 client->warning_fn= function;
00570 }
00571
00572 void gearman_client_set_status_fn(gearman_client_st *client,
00573 gearman_status_fn *function)
00574 {
00575 client->status_fn= function;
00576 }
00577
00578 void gearman_client_set_complete_fn(gearman_client_st *client,
00579 gearman_complete_fn *function)
00580 {
00581 client->complete_fn= function;
00582 }
00583
00584 void gearman_client_set_exception_fn(gearman_client_st *client,
00585 gearman_exception_fn *function)
00586 {
00587 client->exception_fn= function;
00588 }
00589
00590 void gearman_client_set_fail_fn(gearman_client_st *client,
00591 gearman_fail_fn *function)
00592 {
00593 client->fail_fn= function;
00594 }
00595
00596 void gearman_client_clear_fn(gearman_client_st *client)
00597 {
00598 client->workload_fn= NULL;
00599 client->created_fn= NULL;
00600 client->data_fn= NULL;
00601 client->warning_fn= NULL;
00602 client->status_fn= NULL;
00603 client->complete_fn= NULL;
00604 client->exception_fn= NULL;
00605 client->fail_fn= NULL;
00606 }
00607
00608 gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
00609 {
00610 gearman_options_t options;
00611 gearman_return_t ret;
00612
00613 options= client->gearman->options;
00614 client->gearman->options|= GEARMAN_NON_BLOCKING;
00615
00616 switch(client->state)
00617 {
00618 case GEARMAN_CLIENT_STATE_IDLE:
00619 while (1)
00620 {
00621
00622 if (client->new_tasks > 0 && !(client->options & GEARMAN_CLIENT_NO_NEW))
00623 {
00624 for (client->task= client->task_list; client->task != NULL;
00625 client->task= client->task->next)
00626 {
00627 if (client->task->state != GEARMAN_TASK_STATE_NEW)
00628 continue;
00629
00630 case GEARMAN_CLIENT_STATE_NEW:
00631 ret= _client_run_task(client, client->task);
00632 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00633 {
00634 client->state= GEARMAN_CLIENT_STATE_NEW;
00635 client->gearman->options= options;
00636 return ret;
00637 }
00638 }
00639 }
00640
00641
00642 while ((client->con= gearman_con_ready(client->gearman)) != NULL)
00643 {
00644 if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
00645 {
00646
00647 for (client->task= client->task_list; client->task != NULL;
00648 client->task= client->task->next)
00649 {
00650 if (client->task->con != client->con ||
00651 (client->task->state != GEARMAN_TASK_STATE_SUBMIT &&
00652 client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
00653 {
00654 continue;
00655 }
00656
00657 case GEARMAN_CLIENT_STATE_SUBMIT:
00658 ret= _client_run_task(client, client->task);
00659 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00660 {
00661 client->state= GEARMAN_CLIENT_STATE_SUBMIT;
00662 client->gearman->options= options;
00663 return ret;
00664 }
00665 }
00666 }
00667
00668 if (!(client->con->revents & POLLIN))
00669 continue;
00670
00671
00672 while (1)
00673 {
00674
00675 if (client->options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
00676 {
00677
00678 if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
00679 {
00680 for (client->task= client->task_list; client->task != NULL;
00681 client->task= client->task->next)
00682 {
00683 if (client->task->con == client->con &&
00684 (client->task->state == GEARMAN_TASK_STATE_DATA ||
00685 client->task->state == GEARMAN_TASK_STATE_COMPLETE))
00686 {
00687 break;
00688 }
00689 }
00690
00691 assert(client->task != NULL);
00692 }
00693 else
00694 {
00695
00696 client->task= NULL;
00697 (void)gearman_con_recv(client->con, &(client->con->packet), &ret,
00698 false);
00699 }
00700 }
00701 else
00702 {
00703
00704 client->task= NULL;
00705 (void)gearman_con_recv(client->con, &(client->con->packet), &ret,
00706 true);
00707 }
00708
00709 if (client->task == NULL)
00710 {
00711
00712 if (ret != GEARMAN_SUCCESS)
00713 {
00714 if (ret == GEARMAN_IO_WAIT)
00715 break;
00716
00717 client->state= GEARMAN_CLIENT_STATE_IDLE;
00718 client->gearman->options= options;
00719 return ret;
00720 }
00721
00722 client->con->options|= GEARMAN_CON_PACKET_IN_USE;
00723
00724
00725 for (client->task= client->task_list; client->task != NULL;
00726 client->task= client->task->next)
00727 {
00728 if (client->task->con != client->con)
00729 continue;
00730
00731 if (client->con->packet.command == GEARMAN_COMMAND_JOB_CREATED)
00732 {
00733 if (client->task->created_id != client->con->created_id)
00734 continue;
00735
00736
00737 client->con->created_id++;
00738 }
00739 else if (client->con->packet.command == GEARMAN_COMMAND_ERROR)
00740 {
00741 gearman_error_set(client->gearman, "gearman_client_run_tasks",
00742 "%s:%.*s",
00743 (char *)(client->con->packet.arg[0]),
00744 (int)(client->con->packet.arg_size[1]),
00745 (char *)(client->con->packet.arg[1]));
00746 return GEARMAN_SERVER_ERROR;
00747 }
00748 else if (strcmp(client->task->job_handle,
00749 (char *)(client->con->packet.arg[0])))
00750 {
00751 continue;
00752 }
00753
00754
00755
00756 break;
00757 }
00758
00759 assert(client->task != NULL);
00760
00761 client->task->recv= &(client->con->packet);
00762 }
00763
00764 case GEARMAN_CLIENT_STATE_PACKET:
00765
00766 ret= _client_run_task(client, client->task);
00767 if (ret == GEARMAN_IO_WAIT)
00768 break;
00769 if (ret != GEARMAN_SUCCESS)
00770 {
00771 client->state= GEARMAN_CLIENT_STATE_PACKET;
00772 client->gearman->options= options;
00773 return ret;
00774 }
00775
00776
00777 gearman_packet_free(&(client->con->packet));
00778 client->con->options&=
00779 (gearman_client_options_t)~GEARMAN_CON_PACKET_IN_USE;
00780
00781
00782 if (client->running_tasks == 0)
00783 break;
00784 }
00785 }
00786
00787
00788 if (client->running_tasks == 0)
00789 break;
00790
00791 if (client->new_tasks > 0 && !(client->options & GEARMAN_CLIENT_NO_NEW))
00792 continue;
00793
00794 if (options & GEARMAN_NON_BLOCKING)
00795 {
00796
00797 client->state= GEARMAN_CLIENT_STATE_IDLE;
00798 client->gearman->options= options;
00799 return GEARMAN_IO_WAIT;
00800 }
00801
00802
00803 ret= gearman_con_wait(client->gearman);
00804 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00805 {
00806 client->state= GEARMAN_CLIENT_STATE_IDLE;
00807 client->gearman->options= options;
00808 return ret;
00809 }
00810 }
00811
00812 break;
00813
00814 default:
00815 gearman_error_set(client->gearman, "gearman_client_run_tasks",
00816 "unknown state: %u", client->state);
00817 client->gearman->options= options;
00818 return GEARMAN_UNKNOWN_STATE;
00819 }
00820
00821 client->state= GEARMAN_CLIENT_STATE_IDLE;
00822 client->gearman->options= options;
00823 return GEARMAN_SUCCESS;
00824 }
00825
00826
00827
00828
00829
00830 static gearman_client_st *_client_allocate(gearman_client_st *client)
00831 {
00832 if (client == NULL)
00833 {
00834 client= malloc(sizeof(gearman_client_st));
00835 if (client == NULL)
00836 return NULL;
00837
00838 client->options= GEARMAN_CLIENT_ALLOCATED;
00839 }
00840 else
00841 client->options= 0;
00842
00843 client->state= 0;
00844 client->do_ret= 0;
00845 client->new_tasks= 0;
00846 client->running_tasks= 0;
00847 client->task_count= 0;
00848 client->do_data_size= 0;
00849 client->gearman= NULL;
00850 client->context= NULL;
00851 client->con= NULL;
00852 client->task= NULL;
00853 client->task_list= NULL;
00854 client->task_context_free_fn= NULL;
00855 client->do_data= NULL;
00856 client->workload_fn= NULL;
00857 client->created_fn= NULL;
00858 client->data_fn= NULL;
00859 client->warning_fn= NULL;
00860 client->status_fn= NULL;
00861 client->complete_fn= NULL;
00862 client->exception_fn= NULL;
00863 client->fail_fn= NULL;
00864
00865 return client;
00866 }
00867
00868 static gearman_return_t _client_add_server(const char *host, in_port_t port,
00869 void *context)
00870 {
00871 return gearman_client_add_server((gearman_client_st *)context, host, port);
00872 }
00873
00874 static gearman_task_st *_client_add_task(gearman_client_st *client,
00875 gearman_task_st *task,
00876 const void *context,
00877 gearman_command_t command,
00878 const char *function_name,
00879 const char *unique,
00880 const void *workload,
00881 size_t workload_size,
00882 gearman_return_t *ret_ptr)
00883 {
00884 uuid_t uuid;
00885 char uuid_string[37];
00886
00887 task= _task_create(client, task);
00888 if (task == NULL)
00889 {
00890 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00891 return NULL;
00892 }
00893
00894 task->context= context;
00895
00896 if (unique == NULL)
00897 {
00898 uuid_generate(uuid);
00899 uuid_unparse(uuid, uuid_string);
00900 unique= uuid_string;
00901 }
00902
00903 *ret_ptr= gearman_packet_add(client->gearman, &(task->send),
00904 GEARMAN_MAGIC_REQUEST, command,
00905 (uint8_t *)function_name,
00906 (size_t)(strlen(function_name) + 1),
00907 (uint8_t *)unique, (size_t)(strlen(unique) + 1),
00908 workload, workload_size, NULL);
00909 if (*ret_ptr == GEARMAN_SUCCESS)
00910 {
00911 client->new_tasks++;
00912 client->running_tasks++;
00913 task->options|= GEARMAN_TASK_SEND_IN_USE;
00914 }
00915
00916 return task;
00917 }
00918
00919 static gearman_return_t _client_run_task(gearman_client_st *client,
00920 gearman_task_st *task)
00921 {
00922 gearman_return_t ret;
00923 char status_buffer[11];
00924 uint8_t x;
00925
00926 switch(task->state)
00927 {
00928 case GEARMAN_TASK_STATE_NEW:
00929 if (task->client->gearman->con_list == NULL)
00930 {
00931 client->new_tasks--;
00932 client->running_tasks--;
00933 gearman_error_set(client->gearman, "_client_run_task", "no servers added");
00934 return GEARMAN_NO_SERVERS;
00935 }
00936
00937 for (task->con= task->client->gearman->con_list; task->con != NULL;
00938 task->con= task->con->next)
00939 {
00940 if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
00941 break;
00942 }
00943
00944 if (task->con == NULL)
00945 {
00946 client->options|= GEARMAN_CLIENT_NO_NEW;
00947 return GEARMAN_IO_WAIT;
00948 }
00949
00950 client->new_tasks--;
00951
00952 if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
00953 {
00954 task->created_id= task->con->created_id_next;
00955 task->con->created_id_next++;
00956 }
00957
00958 case GEARMAN_TASK_STATE_SUBMIT:
00959 while (1)
00960 {
00961 ret= gearman_con_send(task->con, &(task->send),
00962 client->new_tasks == 0 ? true : false);
00963 if (ret == GEARMAN_SUCCESS)
00964 break;
00965 else if (ret == GEARMAN_IO_WAIT)
00966 {
00967 task->state= GEARMAN_TASK_STATE_SUBMIT;
00968 return GEARMAN_IO_WAIT;
00969 }
00970 else if (ret != GEARMAN_SUCCESS)
00971 {
00972
00973 task->con->created_id++;
00974
00975 if (ret == GEARMAN_COULD_NOT_CONNECT)
00976 {
00977 for (task->con= task->con->next; task->con != NULL;
00978 task->con= task->con->next)
00979 {
00980 if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
00981 break;
00982 }
00983 }
00984
00985 if (task->con == NULL)
00986 {
00987 client->running_tasks--;
00988 return ret;
00989 }
00990
00991 if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
00992 {
00993 task->created_id= task->con->created_id_next;
00994 task->con->created_id_next++;
00995 }
00996 }
00997 }
00998
00999 if (task->send.data_size > 0 && task->send.data == NULL)
01000 {
01001 if (client->workload_fn == NULL)
01002 {
01003 gearman_error_set(client->gearman, "_client_run_task",
01004 "workload size > 0, but no data pointer or workload_fn was given");
01005 return GEARMAN_NEED_WORKLOAD_FN;
01006 }
01007
01008 case GEARMAN_TASK_STATE_WORKLOAD:
01009 ret= (*(client->workload_fn))(task);
01010 if (ret != GEARMAN_SUCCESS)
01011 {
01012 task->state= GEARMAN_TASK_STATE_WORKLOAD;
01013 return ret;
01014 }
01015 }
01016
01017 client->options&= (gearman_client_options_t)~GEARMAN_CLIENT_NO_NEW;
01018 task->state= GEARMAN_TASK_STATE_WORK;
01019 return gearman_con_set_events(task->con, POLLIN);
01020
01021 case GEARMAN_TASK_STATE_WORK:
01022 if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
01023 {
01024 snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
01025 (uint32_t)(task->recv->arg_size[0]),
01026 (char *)(task->recv->arg[0]));
01027
01028 case GEARMAN_TASK_STATE_CREATED:
01029 if (client->created_fn != NULL)
01030 {
01031 ret= (*(client->created_fn))(task);
01032 if (ret != GEARMAN_SUCCESS)
01033 {
01034 task->state= GEARMAN_TASK_STATE_CREATED;
01035 return ret;
01036 }
01037 }
01038
01039 if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
01040 task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
01041 task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
01042 {
01043 break;
01044 }
01045 }
01046 else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
01047 {
01048 case GEARMAN_TASK_STATE_DATA:
01049 if (client->data_fn != NULL)
01050 {
01051 ret= (*(client->data_fn))(task);
01052 if (ret != GEARMAN_SUCCESS)
01053 {
01054 task->state= GEARMAN_TASK_STATE_DATA;
01055 return ret;
01056 }
01057 }
01058 }
01059 else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
01060 {
01061 case GEARMAN_TASK_STATE_WARNING:
01062 if (client->warning_fn != NULL)
01063 {
01064 ret= (*(client->warning_fn))(task);
01065 if (ret != GEARMAN_SUCCESS)
01066 {
01067 task->state= GEARMAN_TASK_STATE_WARNING;
01068 return ret;
01069 }
01070 }
01071 }
01072 else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
01073 task->recv->command == GEARMAN_COMMAND_STATUS_RES)
01074 {
01075 if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
01076 {
01077 if (atoi((char *)task->recv->arg[1]) == 0)
01078 task->is_known= false;
01079 else
01080 task->is_known= true;
01081
01082 if (atoi((char *)task->recv->arg[2]) == 0)
01083 task->is_running= false;
01084 else
01085 task->is_running= true;
01086
01087 x= 3;
01088 }
01089 else
01090 x= 1;
01091
01092 task->numerator= (uint32_t)atoi((char *)task->recv->arg[x]);
01093 snprintf(status_buffer, 11, "%.*s",
01094 (uint32_t)(task->recv->arg_size[x + 1]),
01095 (char *)(task->recv->arg[x + 1]));
01096 task->denominator= (uint32_t)atoi(status_buffer);
01097
01098 case GEARMAN_TASK_STATE_STATUS:
01099 if (client->status_fn != NULL)
01100 {
01101 ret= (*(client->status_fn))(task);
01102 if (ret != GEARMAN_SUCCESS)
01103 {
01104 task->state= GEARMAN_TASK_STATE_STATUS;
01105 return ret;
01106 }
01107 }
01108
01109 if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
01110 break;
01111 }
01112 else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
01113 {
01114 case GEARMAN_TASK_STATE_COMPLETE:
01115 if (client->complete_fn != NULL)
01116 {
01117 ret= (*(client->complete_fn))(task);
01118 if (ret != GEARMAN_SUCCESS)
01119 {
01120 task->state= GEARMAN_TASK_STATE_COMPLETE;
01121 return ret;
01122 }
01123 }
01124
01125 break;
01126 }
01127 else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
01128 {
01129 case GEARMAN_TASK_STATE_EXCEPTION:
01130 if (client->exception_fn != NULL)
01131 {
01132 ret= (*(client->exception_fn))(task);
01133 if (ret != GEARMAN_SUCCESS)
01134 {
01135 task->state= GEARMAN_TASK_STATE_EXCEPTION;
01136 return ret;
01137 }
01138 }
01139 }
01140 else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
01141 {
01142 case GEARMAN_TASK_STATE_FAIL:
01143 if (client->fail_fn != NULL)
01144 {
01145 ret= (*(client->fail_fn))(task);
01146 if (ret != GEARMAN_SUCCESS)
01147 {
01148 task->state= GEARMAN_TASK_STATE_FAIL;
01149 return ret;
01150 }
01151 }
01152
01153 break;
01154 }
01155
01156 task->state= GEARMAN_TASK_STATE_WORK;
01157 return GEARMAN_SUCCESS;
01158
01159 case GEARMAN_TASK_STATE_FINISHED:
01160 break;
01161
01162 default:
01163 gearman_error_set(client->gearman, "_client_run_task", "unknown state: %u",
01164 task->state);
01165 return GEARMAN_UNKNOWN_STATE;
01166 }
01167
01168 client->running_tasks--;
01169 task->state= GEARMAN_TASK_STATE_FINISHED;
01170
01171 if (client->options & GEARMAN_CLIENT_FREE_TASKS)
01172 gearman_task_free(task);
01173
01174 return GEARMAN_SUCCESS;
01175 }
01176
01177 static void *_client_do(gearman_client_st *client, gearman_command_t command,
01178 const char *function_name, const char *unique,
01179 const void *workload, size_t workload_size,
01180 size_t *result_size, gearman_return_t *ret_ptr)
01181 {
01182 if (!(client->options & GEARMAN_CLIENT_TASK_IN_USE))
01183 {
01184 (void)_client_add_task(client, &(client->do_task), client, command,
01185 function_name, unique, workload, workload_size,
01186 ret_ptr);
01187 if (*ret_ptr != GEARMAN_SUCCESS)
01188 return NULL;
01189
01190 client->options|= GEARMAN_CLIENT_TASK_IN_USE;
01191 }
01192
01193 client->workload_fn= NULL;
01194 client->created_fn= NULL;
01195 client->data_fn= _client_do_data;
01196 client->warning_fn= _client_do_data;
01197 client->status_fn= _client_do_status;
01198 client->complete_fn= _client_do_data;
01199 client->exception_fn= _client_do_data;
01200 client->fail_fn= _client_do_fail;
01201
01202 *ret_ptr= gearman_client_run_tasks(client);
01203 if (*ret_ptr != GEARMAN_IO_WAIT && (*ret_ptr != GEARMAN_PAUSE ||
01204 client->do_ret == GEARMAN_WORK_FAIL))
01205 {
01206 gearman_task_free(&(client->do_task));
01207 client->options&= (gearman_client_options_t)~GEARMAN_CLIENT_TASK_IN_USE;
01208 }
01209
01210 workload= NULL;
01211
01212 if (*ret_ptr == GEARMAN_SUCCESS || *ret_ptr == GEARMAN_PAUSE)
01213 {
01214 *ret_ptr= client->do_ret;
01215 workload= client->do_data;
01216 *result_size= client->do_data_size;
01217 client->do_data= NULL;
01218 client->do_data_size= 0;
01219 }
01220
01221 return (void *)workload;
01222 }
01223
01224 static gearman_return_t _client_do_background(gearman_client_st *client,
01225 gearman_command_t command,
01226 const char *function_name,
01227 const char *unique,
01228 const void *workload,
01229 size_t workload_size,
01230 char *job_handle)
01231 {
01232 gearman_return_t ret;
01233
01234 if (!(client->options & GEARMAN_CLIENT_TASK_IN_USE))
01235 {
01236 (void)_client_add_task(client, &(client->do_task), client, command,
01237 function_name, unique, workload, workload_size,
01238 &ret);
01239 if (ret != GEARMAN_SUCCESS)
01240 return ret;
01241
01242 client->options|= GEARMAN_CLIENT_TASK_IN_USE;
01243 }
01244
01245 gearman_client_clear_fn(client);
01246
01247 ret= gearman_client_run_tasks(client);
01248 if (ret != GEARMAN_IO_WAIT)
01249 {
01250 if (job_handle)
01251 strcpy(job_handle, client->do_task.job_handle);
01252
01253 gearman_task_free(&(client->do_task));
01254 client->options&= (gearman_client_options_t)~GEARMAN_CLIENT_TASK_IN_USE;
01255 }
01256
01257 return ret;
01258 }
01259
01260 static gearman_return_t _client_do_data(gearman_task_st *task)
01261 {
01262 gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01263
01264 if (client->do_ret != GEARMAN_SUCCESS)
01265 {
01266 client->do_ret= GEARMAN_SUCCESS;
01267 return GEARMAN_SUCCESS;
01268 }
01269
01270 client->do_data= gearman_task_take_data(task, &(client->do_data_size));
01271
01272 if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
01273 client->do_ret= GEARMAN_WORK_DATA;
01274 else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
01275 client->do_ret= GEARMAN_WORK_WARNING;
01276 else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
01277 client->do_ret= GEARMAN_WORK_EXCEPTION;
01278 else
01279 return GEARMAN_SUCCESS;
01280
01281 return GEARMAN_PAUSE;
01282 }
01283
01284 static gearman_return_t _client_do_status(gearman_task_st *task)
01285 {
01286 gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01287
01288 if (client->do_ret != GEARMAN_SUCCESS)
01289 {
01290 client->do_ret= GEARMAN_SUCCESS;
01291 return GEARMAN_SUCCESS;
01292 }
01293
01294 client->do_ret= GEARMAN_WORK_STATUS;
01295 return GEARMAN_PAUSE;
01296 }
01297
01298 static gearman_return_t _client_do_fail(gearman_task_st *task)
01299 {
01300 gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01301
01302 client->do_ret= GEARMAN_WORK_FAIL;
01303 return GEARMAN_SUCCESS;
01304 }
01305
01306 static gearman_task_st *_task_create(gearman_client_st *client,
01307 gearman_task_st *task)
01308 {
01309 if (task == NULL)
01310 {
01311 task= malloc(sizeof(gearman_task_st));
01312 if (task == NULL)
01313 {
01314 gearman_error_set(client->gearman, "_task_create", "malloc");
01315 return NULL;
01316 }
01317
01318 task->options= GEARMAN_TASK_ALLOCATED;
01319 }
01320 else
01321 task->options= 0;
01322
01323 task->state= 0;
01324 task->is_known= false;
01325 task->is_running= false;
01326 task->created_id= 0;
01327 task->numerator= 0;
01328 task->denominator= 0;
01329 task->client= client;
01330
01331 if (client->task_list != NULL)
01332 client->task_list->prev= task;
01333 task->next= client->task_list;
01334 task->prev= NULL;
01335 client->task_list= task;
01336 client->task_count++;
01337
01338 task->context= NULL;
01339 task->con= NULL;
01340 task->recv= NULL;
01341 task->job_handle[0]= 0;
01342
01343 return task;
01344 }