00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00025 static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone);
00026
00030 static gearman_return_t _client_add_server(const char *host, in_port_t port,
00031 void *context);
00032
00036 static gearman_task_st *_client_add_task(gearman_client_st *client,
00037 gearman_task_st *task,
00038 void *context,
00039 gearman_command_t command,
00040 const char *function_name,
00041 size_t function_name_length,
00042 const char *unique,
00043 size_t unique_name_length,
00044 const void *workload,
00045 size_t workload_size,
00046 gearman_return_t *ret_ptr);
00047
00051 static gearman_return_t _client_run_task(gearman_client_st *client,
00052 gearman_task_st *task);
00053
00057 static void *_client_do(gearman_client_st *client, gearman_command_t command,
00058 const char *function_name, size_t functiona_name_length,
00059 const char *unique, size_t unique_length,
00060 const void *workload, size_t workload_size,
00061 size_t *result_size, gearman_return_t *ret_ptr);
00062
00066 static gearman_return_t _client_do_background(gearman_client_st *client,
00067 gearman_command_t command,
00068 const char *function_name,
00069 size_t functiona_name_length,
00070 const char *unique,
00071 size_t unique_length,
00072 const void *workload,
00073 size_t workload_size,
00074 char *job_handle);
00075
00079 static gearman_return_t _client_do_data(gearman_task_st *task);
00080
00084 static gearman_return_t _client_do_status(gearman_task_st *task);
00085
00089 static gearman_return_t _client_do_fail(gearman_task_st *task);
00090
00093
00094
00095
00096
00097 gearman_client_st *gearman_client_create(gearman_client_st *client)
00098 {
00099 return _client_allocate(client, false);
00100 }
00101
00102 gearman_client_st *gearman_client_clone(gearman_client_st *client,
00103 const gearman_client_st *from)
00104 {
00105 gearman_universal_st *check;
00106
00107 if (! from)
00108 {
00109 return _client_allocate(client, false);
00110 }
00111
00112 client= _client_allocate(client, true);
00113
00114 if (client == NULL)
00115 {
00116 return client;
00117 }
00118
00119 client->options.non_blocking= from->options.non_blocking;
00120 client->options.task_in_use= from->options.task_in_use;
00121 client->options.unbuffered_result= from->options.unbuffered_result;
00122 client->options.no_new= from->options.no_new;
00123 client->options.free_tasks= from->options.free_tasks;
00124
00125 check= gearman_universal_clone((&client->universal), &(from->universal));
00126 if (! check)
00127 {
00128 gearman_client_free(client);
00129 return NULL;
00130 }
00131
00132 return client;
00133 }
00134
00135 void gearman_client_free(gearman_client_st *client)
00136 {
00137 if (client->options.task_in_use)
00138 gearman_task_free(&(client->do_task));
00139
00140 gearman_client_task_free_all(client);
00141
00142 gearman_universal_free(&client->universal);
00143
00144 if (client->options.allocated)
00145 free(client);
00146 }
00147
00148 const char *gearman_client_error(const gearman_client_st *client)
00149 {
00150 return gearman_universal_error(&client->universal);
00151 }
00152
00153 int gearman_client_errno(const gearman_client_st *client)
00154 {
00155 return gearman_universal_errno(&client->universal);
00156 }
00157
00158 gearman_client_options_t gearman_client_options(const gearman_client_st *client)
00159 {
00160 gearman_client_options_t options;
00161 memset(&options, 0, sizeof(gearman_client_options_t));
00162
00163 if (client->options.allocated)
00164 options|= GEARMAN_CLIENT_ALLOCATED;
00165 if (client->options.non_blocking)
00166 options|= GEARMAN_CLIENT_NON_BLOCKING;
00167 if (client->options.task_in_use)
00168 options|= GEARMAN_CLIENT_TASK_IN_USE;
00169 if (client->options.unbuffered_result)
00170 options|= GEARMAN_CLIENT_UNBUFFERED_RESULT;
00171 if (client->options.no_new)
00172 options|= GEARMAN_CLIENT_NO_NEW;
00173 if (client->options.free_tasks)
00174 options|= GEARMAN_CLIENT_FREE_TASKS;
00175
00176 return options;
00177 }
00178
00179 void gearman_client_set_options(gearman_client_st *client,
00180 gearman_client_options_t options)
00181 {
00182 gearman_client_options_t usable_options[]= {
00183 GEARMAN_CLIENT_NON_BLOCKING,
00184 GEARMAN_CLIENT_UNBUFFERED_RESULT,
00185 GEARMAN_CLIENT_FREE_TASKS,
00186 GEARMAN_CLIENT_MAX
00187 };
00188
00189 gearman_client_options_t *ptr;
00190
00191
00192 for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
00193 {
00194 if (options & *ptr)
00195 {
00196 gearman_client_add_options(client, *ptr);
00197 }
00198 else
00199 {
00200 gearman_client_remove_options(client, *ptr);
00201 }
00202 }
00203 }
00204
00205 void gearman_client_add_options(gearman_client_st *client,
00206 gearman_client_options_t options)
00207 {
00208 if (options & GEARMAN_CLIENT_NON_BLOCKING)
00209 {
00210 gearman_universal_add_options(&client->universal, GEARMAN_NON_BLOCKING);
00211 client->options.non_blocking= true;
00212 }
00213
00214 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
00215 {
00216 client->options.unbuffered_result= true;
00217 }
00218
00219 if (options & GEARMAN_CLIENT_FREE_TASKS)
00220 {
00221 client->options.free_tasks= true;
00222 }
00223 }
00224
00225 void gearman_client_remove_options(gearman_client_st *client,
00226 gearman_client_options_t options)
00227 {
00228 if (options & GEARMAN_CLIENT_NON_BLOCKING)
00229 {
00230 gearman_universal_remove_options(&client->universal, GEARMAN_NON_BLOCKING);
00231 client->options.non_blocking= false;
00232 }
00233
00234 if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
00235 {
00236 client->options.unbuffered_result= false;
00237 }
00238
00239 if (options & GEARMAN_CLIENT_FREE_TASKS)
00240 {
00241 client->options.free_tasks= false;
00242 }
00243 }
00244
00245 int gearman_client_timeout(gearman_client_st *client)
00246 {
00247 return gearman_universal_timeout(&client->universal);
00248 }
00249
00250 void gearman_client_set_timeout(gearman_client_st *client, int timeout)
00251 {
00252 gearman_universal_set_timeout(&client->universal, timeout);
00253 }
00254
00255 void *gearman_client_context(const gearman_client_st *client)
00256 {
00257 return (void *)(client->context);
00258 }
00259
00260 void gearman_client_set_context(gearman_client_st *client, void *context)
00261 {
00262 client->context= context;
00263 }
00264
00265 void gearman_client_set_log_fn(gearman_client_st *client,
00266 gearman_log_fn *function, void *context,
00267 gearman_verbose_t verbose)
00268 {
00269 gearman_set_log_fn(&client->universal, function, context, verbose);
00270 }
00271
00272 void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
00273 gearman_malloc_fn *function,
00274 void *context)
00275 {
00276 gearman_set_workload_malloc_fn(&client->universal, function, context);
00277 }
00278
00279 void gearman_client_set_workload_free_fn(gearman_client_st *client,
00280 gearman_free_fn *function,
00281 void *context)
00282 {
00283 gearman_set_workload_free_fn(&client->universal, function, context);
00284 }
00285
00286 gearman_return_t gearman_client_add_server(gearman_client_st *client,
00287 const char *host, in_port_t port)
00288 {
00289 if (gearman_connection_create_args(&client->universal, NULL, host, port) == NULL)
00290 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00291
00292 return GEARMAN_SUCCESS;
00293 }
00294
00295 gearman_return_t gearman_client_add_servers(gearman_client_st *client,
00296 const char *servers)
00297 {
00298 return gearman_parse_servers(servers, _client_add_server, client);
00299 }
00300
00301 void gearman_client_remove_servers(gearman_client_st *client)
00302 {
00303 gearman_free_all_cons(&client->universal);
00304 }
00305
00306 gearman_return_t gearman_client_wait(gearman_client_st *client)
00307 {
00308 return gearman_wait(&client->universal);
00309 }
00310
00311 void *gearman_client_do(gearman_client_st *client, const char *function_name,
00312 const char *unique, const void *workload,
00313 size_t workload_size, size_t *result_size,
00314 gearman_return_t *ret_ptr)
00315 {
00316 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
00317 function_name, strlen(function_name),
00318 unique, unique ? strlen(unique) : 0,
00319 workload, workload_size, result_size, ret_ptr);
00320 }
00321
00322 void *gearman_client_do_high(gearman_client_st *client,
00323 const char *function_name, const char *unique,
00324 const void *workload, size_t workload_size,
00325 size_t *result_size, gearman_return_t *ret_ptr)
00326 {
00327 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
00328 function_name, strlen(function_name),
00329 unique, unique ? strlen(unique) : 0,
00330 workload, workload_size, result_size, ret_ptr);
00331 }
00332
00333 void *gearman_client_do_low(gearman_client_st *client,
00334 const char *function_name, const char *unique,
00335 const void *workload, size_t workload_size,
00336 size_t *result_size, gearman_return_t *ret_ptr)
00337 {
00338 return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
00339 function_name, strlen(function_name),
00340 unique, unique ? strlen(unique) : 0,
00341 workload, workload_size, result_size, ret_ptr);
00342 }
00343
00344 const char *gearman_client_do_job_handle(const gearman_client_st *client)
00345 {
00346 return client->do_task.job_handle;
00347 }
00348
00349 void gearman_client_do_status(gearman_client_st *client, uint32_t *numerator,
00350 uint32_t *denominator)
00351 {
00352 if (numerator != NULL)
00353 *numerator= client->do_task.numerator;
00354
00355 if (denominator != NULL)
00356 *denominator= client->do_task.denominator;
00357 }
00358
00359 gearman_return_t gearman_client_do_background(gearman_client_st *client,
00360 const char *function_name,
00361 const char *unique,
00362 const void *workload,
00363 size_t workload_size,
00364 char *job_handle)
00365 {
00366 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
00367 function_name, strlen(function_name),
00368 unique, unique ? strlen(unique) : 0,
00369 workload, workload_size,
00370 job_handle);
00371 }
00372
00373 gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
00374 const char *function_name,
00375 const char *unique,
00376 const void *workload,
00377 size_t workload_size,
00378 char *job_handle)
00379 {
00380 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
00381 function_name, strlen(function_name),
00382 unique, unique ? strlen(unique) : 0,
00383 workload, workload_size,
00384 job_handle);
00385 }
00386
00387 gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
00388 const char *function_name,
00389 const char *unique,
00390 const void *workload,
00391 size_t workload_size,
00392 char *job_handle)
00393 {
00394 return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
00395 function_name, strlen(function_name),
00396 unique, unique ? strlen(unique) : 0,
00397 workload, workload_size,
00398 job_handle);
00399 }
00400
00401 gearman_return_t gearman_client_job_status(gearman_client_st *client,
00402 const char *job_handle,
00403 bool *is_known, bool *is_running,
00404 uint32_t *numerator,
00405 uint32_t *denominator)
00406 {
00407 gearman_return_t ret;
00408
00409 if (! (client->options.task_in_use))
00410 {
00411 (void)gearman_client_add_task_status(client, &(client->do_task), client,
00412 job_handle, &ret);
00413 if (ret != GEARMAN_SUCCESS)
00414 return ret;
00415
00416 client->options.task_in_use= true;
00417 }
00418
00419 gearman_client_clear_fn(client);
00420
00421 ret= gearman_client_run_tasks(client);
00422 if (ret != GEARMAN_IO_WAIT)
00423 {
00424 if (is_known != NULL)
00425 *is_known= client->do_task.options.is_known;
00426 if (is_running != NULL)
00427 *is_running= client->do_task.options.is_running;
00428 if (numerator != NULL)
00429 *numerator= client->do_task.numerator;
00430 if (denominator != NULL)
00431 *denominator= client->do_task.denominator;
00432
00433 gearman_task_free(&(client->do_task));
00434 client->options.task_in_use= false;
00435 }
00436
00437 return ret;
00438 }
00439
00440 gearman_return_t gearman_client_echo(gearman_client_st *client,
00441 const void *workload,
00442 size_t workload_size)
00443 {
00444 return gearman_echo(&client->universal, workload, workload_size);
00445 }
00446
00447 void gearman_client_task_free_all(gearman_client_st *client)
00448 {
00449 while (client->task_list != NULL)
00450 gearman_task_free(client->task_list);
00451 }
00452
00453 void gearman_client_set_task_context_free_fn(gearman_client_st *client,
00454 gearman_task_context_free_fn *function)
00455 {
00456 client->task_context_free_fn= function;
00457 }
00458
00459 gearman_task_st *gearman_client_add_task(gearman_client_st *client,
00460 gearman_task_st *task,
00461 void *context,
00462 const char *function_name,
00463 const char *unique,
00464 const void *workload,
00465 size_t workload_size,
00466 gearman_return_t *ret_ptr)
00467 {
00468 return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB,
00469 function_name, strlen(function_name),
00470 unique, unique ? strlen(unique) : 0,
00471 workload, workload_size,
00472 ret_ptr);
00473 }
00474
00475 gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
00476 gearman_task_st *task,
00477 void *context,
00478 const char *function_name,
00479 const char *unique,
00480 const void *workload,
00481 size_t workload_size,
00482 gearman_return_t *ret_ptr)
00483 {
00484 return _client_add_task(client, task, context,
00485 GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
00486 function_name, strlen(function_name),
00487 unique, unique ? strlen(unique) : 0,
00488 workload, workload_size, ret_ptr);
00489 }
00490
00491 gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
00492 gearman_task_st *task,
00493 void *context,
00494 const char *function_name,
00495 const char *unique,
00496 const void *workload,
00497 size_t workload_size,
00498 gearman_return_t *ret_ptr)
00499 {
00500 return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
00501 function_name, strlen(function_name),
00502 unique, unique ? strlen(unique) : 0,
00503 workload, workload_size,
00504 ret_ptr);
00505 }
00506
00507 gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
00508 gearman_task_st *task,
00509 void *context,
00510 const char *function_name,
00511 const char *unique,
00512 const void *workload,
00513 size_t workload_size,
00514 gearman_return_t *ret_ptr)
00515 {
00516 return _client_add_task(client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
00517 function_name, strlen(function_name),
00518 unique, unique ? strlen(unique) : 0,
00519 workload, workload_size,
00520 ret_ptr);
00521 }
00522
00523 gearman_task_st *
00524 gearman_client_add_task_high_background(gearman_client_st *client,
00525 gearman_task_st *task,
00526 void *context,
00527 const char *function_name,
00528 const char *unique,
00529 const void *workload,
00530 size_t workload_size,
00531 gearman_return_t *ret_ptr)
00532 {
00533 return _client_add_task(client, task, context,
00534 GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
00535 function_name, strlen(function_name),
00536 unique, unique ? strlen(unique) : 0,
00537 workload, workload_size, ret_ptr);
00538 }
00539
00540 gearman_task_st *
00541 gearman_client_add_task_low_background(gearman_client_st *client,
00542 gearman_task_st *task,
00543 void *context,
00544 const char *function_name,
00545 const char *unique,
00546 const void *workload,
00547 size_t workload_size,
00548 gearman_return_t *ret_ptr)
00549 {
00550 return _client_add_task(client, task, context,
00551 GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
00552 function_name, strlen(function_name),
00553 unique, unique ? strlen(unique) : 0,
00554 workload, workload_size, ret_ptr);
00555 }
00556
00557 gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
00558 gearman_task_st *task,
00559 void *context,
00560 const char *job_handle,
00561 gearman_return_t *ret_ptr)
00562 {
00563 const void *args[1];
00564 size_t args_size[1];
00565
00566 task= gearman_task_create(client, task);
00567 if (task == NULL)
00568 {
00569 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00570 return NULL;
00571 }
00572
00573 task->context= context;
00574 snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
00575
00576 args[0]= job_handle;
00577 args_size[0]= strlen(job_handle);
00578 *ret_ptr= gearman_packet_create_args(&client->universal, &(task->send),
00579 GEARMAN_MAGIC_REQUEST,
00580 GEARMAN_COMMAND_GET_STATUS,
00581 args, args_size, 1);
00582 if (*ret_ptr == GEARMAN_SUCCESS)
00583 {
00584 client->new_tasks++;
00585 client->running_tasks++;
00586 task->options.send_in_use= true;
00587 }
00588
00589 return task;
00590 }
00591
00592 void gearman_client_set_workload_fn(gearman_client_st *client,
00593 gearman_workload_fn *function)
00594 {
00595 client->workload_fn= function;
00596 }
00597
00598 void gearman_client_set_created_fn(gearman_client_st *client,
00599 gearman_created_fn *function)
00600 {
00601 client->created_fn= function;
00602 }
00603
00604 void gearman_client_set_data_fn(gearman_client_st *client,
00605 gearman_data_fn *function)
00606 {
00607 client->data_fn= function;
00608 }
00609
00610 void gearman_client_set_warning_fn(gearman_client_st *client,
00611 gearman_warning_fn *function)
00612 {
00613 client->warning_fn= function;
00614 }
00615
00616 void gearman_client_set_status_fn(gearman_client_st *client,
00617 gearman_universal_status_fn *function)
00618 {
00619 client->status_fn= function;
00620 }
00621
00622 void gearman_client_set_complete_fn(gearman_client_st *client,
00623 gearman_complete_fn *function)
00624 {
00625 client->complete_fn= function;
00626 }
00627
00628 void gearman_client_set_exception_fn(gearman_client_st *client,
00629 gearman_exception_fn *function)
00630 {
00631 client->exception_fn= function;
00632 }
00633
00634 void gearman_client_set_fail_fn(gearman_client_st *client,
00635 gearman_fail_fn *function)
00636 {
00637 client->fail_fn= function;
00638 }
00639
00640 void gearman_client_clear_fn(gearman_client_st *client)
00641 {
00642 client->workload_fn= NULL;
00643 client->created_fn= NULL;
00644 client->data_fn= NULL;
00645 client->warning_fn= NULL;
00646 client->status_fn= NULL;
00647 client->complete_fn= NULL;
00648 client->exception_fn= NULL;
00649 client->fail_fn= NULL;
00650 }
00651
00652 static inline void _push_non_blocking(gearman_client_st *client)
00653 {
00654 client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
00655 client->universal.options.non_blocking= true;
00656 }
00657
00658 static inline void _pop_non_blocking(gearman_client_st *client)
00659 {
00660 client->universal.options.non_blocking= client->options.non_blocking;
00661 assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
00662 }
00663
00664 static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
00665 {
00666 gearman_return_t ret;
00667
00668 switch(client->state)
00669 {
00670 case GEARMAN_CLIENT_STATE_IDLE:
00671 while (1)
00672 {
00673
00674 if (client->new_tasks > 0 && ! (client->options.no_new))
00675 {
00676 for (client->task= client->task_list; client->task != NULL;
00677 client->task= client->task->next)
00678 {
00679 if (client->task->state != GEARMAN_TASK_STATE_NEW)
00680 continue;
00681
00682 case GEARMAN_CLIENT_STATE_NEW:
00683 ret= _client_run_task(client, client->task);
00684 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00685 {
00686 client->state= GEARMAN_CLIENT_STATE_NEW;
00687
00688 return ret;
00689 }
00690 }
00691
00692 if (client->new_tasks == 0)
00693 {
00694 ret= gearman_flush_all(&client->universal);
00695 if (ret != GEARMAN_SUCCESS)
00696 {
00697 return ret;
00698 }
00699 }
00700 }
00701
00702
00703 while ((client->con= gearman_ready(&client->universal)) != NULL)
00704 {
00705 if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
00706 {
00707
00708 for (client->task= client->task_list; client->task != NULL;
00709 client->task= client->task->next)
00710 {
00711 if (client->task->con != client->con ||
00712 (client->task->state != GEARMAN_TASK_STATE_SUBMIT &&
00713 client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
00714 {
00715 continue;
00716 }
00717
00718 case GEARMAN_CLIENT_STATE_SUBMIT:
00719 ret= _client_run_task(client, client->task);
00720 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00721 {
00722 client->state= GEARMAN_CLIENT_STATE_SUBMIT;
00723 return ret;
00724 }
00725 }
00726 }
00727
00728 if (! (client->con->revents & POLLIN))
00729 continue;
00730
00731
00732 while (1)
00733 {
00734
00735 if (client->options.unbuffered_result)
00736 {
00737
00738 if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
00739 {
00740 for (client->task= client->task_list; client->task != NULL;
00741 client->task= client->task->next)
00742 {
00743 if (client->task->con == client->con &&
00744 (client->task->state == GEARMAN_TASK_STATE_DATA ||
00745 client->task->state == GEARMAN_TASK_STATE_COMPLETE))
00746 {
00747 break;
00748 }
00749 }
00750
00751 assert(client->task != NULL);
00752 }
00753 else
00754 {
00755
00756 client->task= NULL;
00757 (void)gearman_connection_recv(client->con, &(client->con->packet), &ret,
00758 false);
00759 }
00760 }
00761 else
00762 {
00763
00764 client->task= NULL;
00765 (void)gearman_connection_recv(client->con, &(client->con->packet), &ret, true);
00766 }
00767
00768 if (client->task == NULL)
00769 {
00770
00771 if (ret != GEARMAN_SUCCESS)
00772 {
00773 if (ret == GEARMAN_IO_WAIT)
00774 break;
00775
00776 client->state= GEARMAN_CLIENT_STATE_IDLE;
00777 return ret;
00778 }
00779
00780 client->con->options.packet_in_use= true;
00781
00782
00783 for (client->task= client->task_list; client->task != NULL;
00784 client->task= client->task->next)
00785 {
00786 if (client->task->con != client->con)
00787 continue;
00788
00789 if (client->con->packet.command == GEARMAN_COMMAND_JOB_CREATED)
00790 {
00791 if (client->task->created_id != client->con->created_id)
00792 continue;
00793
00794
00795 client->con->created_id++;
00796 }
00797 else if (client->con->packet.command == GEARMAN_COMMAND_ERROR)
00798 {
00799 gearman_universal_set_error(&client->universal, "gearman_client_run_tasks",
00800 "%s:%.*s",
00801 (char *)(client->con->packet.arg[0]),
00802 (int)(client->con->packet.arg_size[1]),
00803 (char *)(client->con->packet.arg[1]));
00804
00805 return GEARMAN_SERVER_ERROR;
00806 }
00807 else if (strncmp(client->task->job_handle,
00808 (char *)(client->con->packet.arg[0]),
00809 client->con->packet.arg_size[0]) ||
00810 (client->con->packet.command != GEARMAN_COMMAND_WORK_FAIL &&
00811 strlen(client->task->job_handle) != client->con->packet.arg_size[0] - 1) ||
00812 (client->con->packet.command == GEARMAN_COMMAND_WORK_FAIL &&
00813 strlen(client->task->job_handle) != client->con->packet.arg_size[0]))
00814 {
00815 continue;
00816 }
00817
00818
00819
00820 break;
00821 }
00822
00823 if (client->task == NULL)
00824 {
00825
00826 gearman_packet_free(&(client->con->packet));
00827 client->con->options.packet_in_use= false;
00828 continue;
00829 }
00830
00831 client->task->recv= &(client->con->packet);
00832 }
00833
00834 case GEARMAN_CLIENT_STATE_PACKET:
00835
00836 ret= _client_run_task(client, client->task);
00837 if (ret == GEARMAN_IO_WAIT)
00838 break;
00839 if (ret != GEARMAN_SUCCESS)
00840 {
00841 client->state= GEARMAN_CLIENT_STATE_PACKET;
00842 return ret;
00843 }
00844
00845
00846 gearman_packet_free(&(client->con->packet));
00847 client->con->options.packet_in_use= false;
00848
00849
00850 if (client->running_tasks == 0)
00851 break;
00852 }
00853 }
00854
00855
00856 if (client->running_tasks == 0)
00857 break;
00858
00859 if (client->new_tasks > 0 && ! (client->options.no_new))
00860 continue;
00861
00862 if (client->options.non_blocking)
00863 {
00864
00865 client->state= GEARMAN_CLIENT_STATE_IDLE;
00866
00867 return GEARMAN_IO_WAIT;
00868 }
00869
00870
00871 ret= gearman_wait(&client->universal);
00872 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
00873 {
00874 client->state= GEARMAN_CLIENT_STATE_IDLE;
00875
00876 return ret;
00877 }
00878 }
00879
00880 break;
00881
00882 default:
00883 gearman_universal_set_error(&client->universal, "gearman_client_run_tasks",
00884 "unknown state: %u", client->state);
00885
00886 return GEARMAN_UNKNOWN_STATE;
00887 }
00888
00889 client->state= GEARMAN_CLIENT_STATE_IDLE;
00890
00891 return GEARMAN_SUCCESS;
00892 }
00893
00894 gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
00895 {
00896 gearman_return_t rc;
00897
00898 _push_non_blocking(client);
00899
00900 rc= _client_run_tasks(client);
00901
00902 _pop_non_blocking(client);
00903
00904 return rc;
00905 }
00906
00907
00908
00909
00910
00911 static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
00912 {
00913 if (client == NULL)
00914 {
00915 client= malloc(sizeof(gearman_client_st));
00916 if (client == NULL)
00917 return NULL;
00918
00919 client->options.allocated= true;
00920 }
00921 else
00922 {
00923 client->options.allocated= false;
00924 }
00925
00926 client->options.non_blocking= false;
00927 client->options.task_in_use= false;
00928 client->options.unbuffered_result= false;
00929 client->options.no_new= false;
00930 client->options.free_tasks= false;
00931
00932 client->state= 0;
00933 client->do_ret= 0;
00934 client->new_tasks= 0;
00935 client->running_tasks= 0;
00936 client->task_count= 0;
00937 client->do_data_size= 0;
00938 client->context= NULL;
00939 client->con= NULL;
00940 client->task= NULL;
00941 client->task_list= NULL;
00942 client->task_context_free_fn= NULL;
00943 client->do_data= NULL;
00944 client->workload_fn= NULL;
00945 client->created_fn= NULL;
00946 client->data_fn= NULL;
00947 client->warning_fn= NULL;
00948 client->status_fn= NULL;
00949 client->complete_fn= NULL;
00950 client->exception_fn= NULL;
00951 client->fail_fn= NULL;
00952
00953 if (! is_clone)
00954 {
00955 gearman_universal_st *check;
00956
00957 check= gearman_universal_create(&client->universal, NULL);
00958 if (check == NULL)
00959 {
00960 gearman_client_free(client);
00961 return NULL;
00962 }
00963 }
00964
00965 return client;
00966 }
00967
00968 static gearman_return_t _client_add_server(const char *host, in_port_t port,
00969 void *context)
00970 {
00971 return gearman_client_add_server((gearman_client_st *)context, host, port);
00972 }
00973
00974 static gearman_task_st *_client_add_task(gearman_client_st *client,
00975 gearman_task_st *task,
00976 void *context,
00977 gearman_command_t command,
00978 const char *function_name,
00979 size_t function_name_length,
00980 const char *unique,
00981 size_t unique_length,
00982 const void *workload,
00983 size_t workload_size,
00984 gearman_return_t *ret_ptr)
00985 {
00986 uuid_t uuid;
00987 char uuid_string[37];
00988 const void *args[3];
00989 size_t args_size[3];
00990
00991 task= gearman_task_create(client, task);
00992 if (task == NULL)
00993 {
00994 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00995 return NULL;
00996 }
00997
00998 task->context= context;
00999
01000 if (unique == NULL)
01001 {
01002 uuid_generate(uuid);
01003 uuid_unparse(uuid, uuid_string);
01004 uuid_string[36]= 0;
01005 unique= uuid_string;
01006 unique_length= 36;
01007 }
01008
01013 args[0]= function_name;
01014 args_size[0]= function_name_length + 1;
01015 args[1]= unique;
01016 args_size[1]= unique_length + 1;
01017 args[2]= workload;
01018 args_size[2]= workload_size;
01019
01020 *ret_ptr= gearman_packet_create_args(&client->universal, &(task->send),
01021 GEARMAN_MAGIC_REQUEST, command,
01022 args, args_size, 3);
01023 if (*ret_ptr == GEARMAN_SUCCESS)
01024 {
01025 client->new_tasks++;
01026 client->running_tasks++;
01027 task->options.send_in_use= true;
01028 }
01029
01030 return task;
01031 }
01032
01033 static gearman_return_t _client_run_task(gearman_client_st *client,
01034 gearman_task_st *task)
01035 {
01036 gearman_return_t ret;
01037 char status_buffer[11];
01038 uint8_t x;
01039
01040 switch(task->state)
01041 {
01042 case GEARMAN_TASK_STATE_NEW:
01043 if (task->client->universal.con_list == NULL)
01044 {
01045 client->new_tasks--;
01046 client->running_tasks--;
01047 gearman_universal_set_error(&client->universal, "_client_run_task",
01048 "no servers added");
01049 return GEARMAN_NO_SERVERS;
01050 }
01051
01052 for (task->con= task->client->universal.con_list; task->con != NULL;
01053 task->con= task->con->next)
01054 {
01055 if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
01056 break;
01057 }
01058
01059 if (task->con == NULL)
01060 {
01061 client->options.no_new= true;
01062 return GEARMAN_IO_WAIT;
01063 }
01064
01065 client->new_tasks--;
01066
01067 if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
01068 {
01069 task->created_id= task->con->created_id_next;
01070 task->con->created_id_next++;
01071 }
01072
01073 case GEARMAN_TASK_STATE_SUBMIT:
01074 while (1)
01075 {
01076 ret= gearman_connection_send(task->con, &(task->send),
01077 client->new_tasks == 0 ? true : false);
01078 if (ret == GEARMAN_SUCCESS)
01079 break;
01080 else if (ret == GEARMAN_IO_WAIT)
01081 {
01082 task->state= GEARMAN_TASK_STATE_SUBMIT;
01083 return GEARMAN_IO_WAIT;
01084 }
01085 else if (ret != GEARMAN_SUCCESS)
01086 {
01087
01088 task->con->created_id++;
01089
01090 if (ret == GEARMAN_COULD_NOT_CONNECT)
01091 {
01092 for (task->con= task->con->next; task->con != NULL;
01093 task->con= task->con->next)
01094 {
01095 if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
01096 break;
01097 }
01098 }
01099 else
01100 task->con= NULL;
01101
01102 if (task->con == NULL)
01103 {
01104 client->running_tasks--;
01105 return ret;
01106 }
01107
01108 if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
01109 {
01110 task->created_id= task->con->created_id_next;
01111 task->con->created_id_next++;
01112 }
01113 }
01114 }
01115
01116 if (task->send.data_size > 0 && task->send.data == NULL)
01117 {
01118 if (client->workload_fn == NULL)
01119 {
01120 gearman_universal_set_error(&client->universal, "_client_run_task",
01121 "workload size > 0, but no data pointer or workload_fn was given");
01122 return GEARMAN_NEED_WORKLOAD_FN;
01123 }
01124
01125 case GEARMAN_TASK_STATE_WORKLOAD:
01126 ret= client->workload_fn(task);
01127 if (ret != GEARMAN_SUCCESS)
01128 {
01129 task->state= GEARMAN_TASK_STATE_WORKLOAD;
01130 return ret;
01131 }
01132 }
01133
01134 client->options.no_new= false;
01135 task->state= GEARMAN_TASK_STATE_WORK;
01136 return gearman_connection_set_events(task->con, POLLIN);
01137
01138 case GEARMAN_TASK_STATE_WORK:
01139 if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
01140 {
01141 snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
01142 (uint32_t)(task->recv->arg_size[0]),
01143 (char *)(task->recv->arg[0]));
01144
01145 case GEARMAN_TASK_STATE_CREATED:
01146 if (client->created_fn != NULL)
01147 {
01148 ret= client->created_fn(task);
01149 if (ret != GEARMAN_SUCCESS)
01150 {
01151 task->state= GEARMAN_TASK_STATE_CREATED;
01152 return ret;
01153 }
01154 }
01155
01156 if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
01157 task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
01158 task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
01159 {
01160 break;
01161 }
01162 }
01163 else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
01164 {
01165 case GEARMAN_TASK_STATE_DATA:
01166 if (client->data_fn != NULL)
01167 {
01168 ret= client->data_fn(task);
01169 if (ret != GEARMAN_SUCCESS)
01170 {
01171 task->state= GEARMAN_TASK_STATE_DATA;
01172 return ret;
01173 }
01174 }
01175 }
01176 else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
01177 {
01178 case GEARMAN_TASK_STATE_WARNING:
01179 if (client->warning_fn != NULL)
01180 {
01181 ret= client->warning_fn(task);
01182 if (ret != GEARMAN_SUCCESS)
01183 {
01184 task->state= GEARMAN_TASK_STATE_WARNING;
01185 return ret;
01186 }
01187 }
01188 }
01189 else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
01190 task->recv->command == GEARMAN_COMMAND_STATUS_RES)
01191 {
01192 if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
01193 {
01194 if (atoi((char *)task->recv->arg[1]) == 0)
01195 task->options.is_known= false;
01196 else
01197 task->options.is_known= true;
01198
01199 if (atoi((char *)task->recv->arg[2]) == 0)
01200 task->options.is_running= false;
01201 else
01202 task->options.is_running= true;
01203
01204 x= 3;
01205 }
01206 else
01207 x= 1;
01208
01209 task->numerator= (uint32_t)atoi((char *)task->recv->arg[x]);
01210 snprintf(status_buffer, 11, "%.*s",
01211 (uint32_t)(task->recv->arg_size[x + 1]),
01212 (char *)(task->recv->arg[x + 1]));
01213 task->denominator= (uint32_t)atoi(status_buffer);
01214
01215 case GEARMAN_TASK_STATE_STATUS:
01216 if (client->status_fn != NULL)
01217 {
01218 ret= client->status_fn(task);
01219 if (ret != GEARMAN_SUCCESS)
01220 {
01221 task->state= GEARMAN_TASK_STATE_STATUS;
01222 return ret;
01223 }
01224 }
01225
01226 if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
01227 break;
01228 }
01229 else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
01230 {
01231 case GEARMAN_TASK_STATE_COMPLETE:
01232 if (client->complete_fn != NULL)
01233 {
01234 ret= client->complete_fn(task);
01235 if (ret != GEARMAN_SUCCESS)
01236 {
01237 task->state= GEARMAN_TASK_STATE_COMPLETE;
01238 return ret;
01239 }
01240 }
01241
01242 break;
01243 }
01244 else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
01245 {
01246 case GEARMAN_TASK_STATE_EXCEPTION:
01247 if (client->exception_fn != NULL)
01248 {
01249 ret= client->exception_fn(task);
01250 if (ret != GEARMAN_SUCCESS)
01251 {
01252 task->state= GEARMAN_TASK_STATE_EXCEPTION;
01253 return ret;
01254 }
01255 }
01256 }
01257 else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
01258 {
01259 case GEARMAN_TASK_STATE_FAIL:
01260 if (client->fail_fn != NULL)
01261 {
01262 ret= client->fail_fn(task);
01263 if (ret != GEARMAN_SUCCESS)
01264 {
01265 task->state= GEARMAN_TASK_STATE_FAIL;
01266 return ret;
01267 }
01268 }
01269
01270 break;
01271 }
01272
01273 task->state= GEARMAN_TASK_STATE_WORK;
01274 return GEARMAN_SUCCESS;
01275
01276 case GEARMAN_TASK_STATE_FINISHED:
01277 break;
01278
01279 default:
01280 gearman_universal_set_error(&client->universal, "_client_run_task", "unknown state: %u",
01281 task->state);
01282 return GEARMAN_UNKNOWN_STATE;
01283 }
01284
01285 client->running_tasks--;
01286 task->state= GEARMAN_TASK_STATE_FINISHED;
01287
01288 if (client->options.free_tasks)
01289 gearman_task_free(task);
01290
01291 return GEARMAN_SUCCESS;
01292 }
01293
01294 static void *_client_do(gearman_client_st *client, gearman_command_t command,
01295 const char *function_name, size_t function_name_length,
01296 const char *unique, size_t unique_length,
01297 const void *workload, size_t workload_size,
01298 size_t *result_size, gearman_return_t *ret_ptr)
01299 {
01300 if (! client->options.task_in_use)
01301 {
01302 (void)_client_add_task(client, &(client->do_task), client, command,
01303 function_name, function_name_length,
01304 unique, unique_length,
01305 workload, workload_size,
01306 ret_ptr);
01307 if (*ret_ptr != GEARMAN_SUCCESS)
01308 return NULL;
01309
01310 client->options.task_in_use= true;
01311 }
01312
01313 client->workload_fn= NULL;
01314 client->created_fn= NULL;
01315 client->data_fn= _client_do_data;
01316 client->warning_fn= _client_do_data;
01317 client->status_fn= _client_do_status;
01318 client->complete_fn= _client_do_data;
01319 client->exception_fn= _client_do_data;
01320 client->fail_fn= _client_do_fail;
01321
01322 *ret_ptr= gearman_client_run_tasks(client);
01323 if (*ret_ptr != GEARMAN_IO_WAIT && (*ret_ptr != GEARMAN_PAUSE ||
01324 client->do_ret == GEARMAN_WORK_FAIL))
01325 {
01326 gearman_task_free(&(client->do_task));
01327 client->options.task_in_use= false;
01328 client->new_tasks= 0;
01329 client->running_tasks= 0;
01330 }
01331
01332 workload= NULL;
01333
01334 if (*ret_ptr == GEARMAN_SUCCESS || *ret_ptr == GEARMAN_PAUSE)
01335 {
01336 *ret_ptr= client->do_ret;
01337 workload= client->do_data;
01338 *result_size= client->do_data_size;
01339 client->do_data= NULL;
01340 client->do_data_size= 0;
01341 }
01342
01343 return (void *)workload;
01344 }
01345
01346 static gearman_return_t _client_do_background(gearman_client_st *client,
01347 gearman_command_t command,
01348 const char *function_name,
01349 size_t function_name_length,
01350 const char *unique,
01351 size_t unique_length,
01352 const void *workload,
01353 size_t workload_size,
01354 char *job_handle)
01355 {
01356 gearman_return_t ret;
01357
01358 if (! client->options.task_in_use)
01359 {
01360 (void)_client_add_task(client, &(client->do_task), client, command,
01361 function_name, function_name_length,
01362 unique, unique_length,
01363 workload, workload_size,
01364 &ret);
01365 if (ret != GEARMAN_SUCCESS)
01366 return ret;
01367
01368 client->options.task_in_use= true;
01369 }
01370
01371 gearman_client_clear_fn(client);
01372
01373 ret= gearman_client_run_tasks(client);
01374 if (ret != GEARMAN_IO_WAIT)
01375 {
01376 if (job_handle)
01377 {
01378 strncpy(job_handle, client->do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
01379 }
01380
01381 gearman_task_free(&(client->do_task));
01382 client->options.task_in_use= false;
01383 client->new_tasks= 0;
01384 client->running_tasks= 0;
01385 }
01386
01387 return ret;
01388 }
01389
01390 static gearman_return_t _client_do_data(gearman_task_st *task)
01391 {
01392 gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01393
01394 if (client->do_ret != GEARMAN_SUCCESS)
01395 {
01396 client->do_ret= GEARMAN_SUCCESS;
01397 return GEARMAN_SUCCESS;
01398 }
01399
01400 client->do_data= gearman_task_take_data(task, &(client->do_data_size));
01401
01402 if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
01403 client->do_ret= GEARMAN_WORK_DATA;
01404 else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
01405 client->do_ret= GEARMAN_WORK_WARNING;
01406 else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
01407 client->do_ret= GEARMAN_WORK_EXCEPTION;
01408 else
01409 return GEARMAN_SUCCESS;
01410
01411 return GEARMAN_PAUSE;
01412 }
01413
01414 static gearman_return_t _client_do_status(gearman_task_st *task)
01415 {
01416 gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01417
01418 if (client->do_ret != GEARMAN_SUCCESS)
01419 {
01420 client->do_ret= GEARMAN_SUCCESS;
01421 return GEARMAN_SUCCESS;
01422 }
01423
01424 client->do_ret= GEARMAN_WORK_STATUS;
01425 return GEARMAN_PAUSE;
01426 }
01427
01428 static gearman_return_t _client_do_fail(gearman_task_st *task)
01429 {
01430 gearman_client_st *client= (gearman_client_st *)gearman_task_context(task);
01431
01432 client->do_ret= GEARMAN_WORK_FAIL;
01433 return GEARMAN_SUCCESS;
01434 }