00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 gearman_return_t _queue_replay_add(gearman_server_st *server, void *context,
00030 const void *unique, size_t unique_size,
00031 const void *function_name,
00032 size_t function_name_size, const void *data,
00033 size_t data_size,
00034 gearman_job_priority_t priority);
00035
00039 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00040 const char *error_code,
00041 const char *error_string);
00042
00046 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00047 gearman_packet_st *packet);
00048
00052 static gearman_return_t
00053 _server_queue_work_data(gearman_server_job_st *server_job,
00054 gearman_packet_st *packet, gearman_command_t command);
00055
00059 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00060
00063
00064
00065
00066
00067 gearman_server_st *gearman_server_create(gearman_server_st *server)
00068 {
00069 struct utsname un;
00070
00071 if (server == NULL)
00072 {
00073 server= (gearman_server_st *)malloc(sizeof(gearman_server_st));
00074 if (server == NULL)
00075 return NULL;
00076
00077 server->options.allocated= true;
00078 }
00079 else
00080 server->options.allocated= false;
00081
00082 server->state.queue_startup= false;
00083 server->flags.round_robin= false;
00084 server->flags.threaded= false;
00085 server->shutdown= false;
00086 server->shutdown_graceful= false;
00087 server->proc_wakeup= false;
00088 server->proc_shutdown= false;
00089 server->job_retries= 0;
00090 server->worker_wakeup= 0;
00091 server->thread_count= 0;
00092 server->free_packet_count= 0;
00093 server->function_count= 0;
00094 server->job_count= 0;
00095 server->unique_count= 0;
00096 server->free_job_count= 0;
00097 server->free_client_count= 0;
00098 server->free_worker_count= 0;
00099 server->thread_list= NULL;
00100 server->free_packet_list= NULL;
00101 server->function_list= NULL;
00102 server->free_job_list= NULL;
00103 server->free_client_list= NULL;
00104 server->free_worker_list= NULL;
00105 server->log_fn= NULL;
00106 server->log_context= NULL;
00107 server->queue_context= NULL;
00108 server->queue_add_fn= NULL;
00109 server->queue_flush_fn= NULL;
00110 server->queue_done_fn= NULL;
00111 server->queue_replay_fn= NULL;
00112 memset(server->job_hash, 0,
00113 sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00114 memset(server->unique_hash, 0,
00115 sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00116
00117 server->gearman= gearman_universal_create(&(server->gearman_universal_static), NULL);
00118 if (server->gearman == NULL)
00119 {
00120 gearman_server_free(server);
00121 return NULL;
00122 }
00123
00124 if (uname(&un) == -1)
00125 {
00126 gearman_server_free(server);
00127 return NULL;
00128 }
00129
00130 snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
00131 un.nodename);
00132 server->job_handle_count= 1;
00133
00134 return server;
00135 }
00136
00137 void gearman_server_free(gearman_server_st *server)
00138 {
00139 uint32_t key;
00140 gearman_server_packet_st *packet;
00141 gearman_server_job_st *job;
00142 gearman_server_client_st *client;
00143 gearman_server_worker_st *worker;
00144
00145
00146 assert(server->thread_list == NULL);
00147
00148 for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
00149 {
00150 while (server->job_hash[key] != NULL)
00151 gearman_server_job_free(server->job_hash[key]);
00152 }
00153
00154 while (server->function_list != NULL)
00155 gearman_server_function_free(server->function_list);
00156
00157 while (server->free_packet_list != NULL)
00158 {
00159 packet= server->free_packet_list;
00160 server->free_packet_list= packet->next;
00161 free(packet);
00162 }
00163
00164 while (server->free_job_list != NULL)
00165 {
00166 job= server->free_job_list;
00167 server->free_job_list= job->next;
00168 free(job);
00169 }
00170
00171 while (server->free_client_list != NULL)
00172 {
00173 client= server->free_client_list;
00174 server->free_client_list= client->con_next;
00175 free(client);
00176 }
00177
00178 while (server->free_worker_list != NULL)
00179 {
00180 worker= server->free_worker_list;
00181 server->free_worker_list= worker->con_next;
00182 free(worker);
00183 }
00184
00185 if (server->gearman != NULL)
00186 gearman_universal_free(server->gearman);
00187
00188 if (server->options.allocated)
00189 free(server);
00190 }
00191
00192 void gearman_server_set_job_retries(gearman_server_st *server,
00193 uint8_t job_retries)
00194 {
00195 server->job_retries= job_retries;
00196 }
00197
00198 void gearman_server_set_worker_wakeup(gearman_server_st *server,
00199 uint8_t worker_wakeup)
00200 {
00201 server->worker_wakeup= worker_wakeup;
00202 }
00203
00204 void gearman_server_set_log_fn(gearman_server_st *server,
00205 gearman_log_fn *function,
00206 void *context, gearman_verbose_t verbose)
00207 {
00208 server->log_fn= function;
00209 server->log_context= context;
00210 gearman_set_log_fn(server->gearman, _log, server, verbose);
00211 }
00212
00213 gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
00214 gearman_packet_st *packet)
00215 {
00216 gearman_return_t ret;
00217 gearman_server_job_st *server_job;
00218 char job_handle[GEARMAN_JOB_HANDLE_SIZE];
00219 char option[GEARMAN_OPTION_SIZE];
00220 gearman_server_client_st *server_client;
00221 char numerator_buffer[11];
00222 char denominator_buffer[11];
00223 gearman_job_priority_t priority;
00224 gearman_server_st *server= server_con->thread->server;
00225
00226 if (packet->magic == GEARMAN_MAGIC_RESPONSE)
00227 {
00228 return _server_error_packet(server_con, "bad_magic",
00229 "Request magic expected");
00230 }
00231
00232 switch (packet->command)
00233 {
00234
00235 case GEARMAN_COMMAND_ECHO_REQ:
00236
00237 ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
00238 GEARMAN_COMMAND_ECHO_RES, packet->data,
00239 packet->data_size, NULL);
00240 if (ret != GEARMAN_SUCCESS)
00241 return ret;
00242
00243 packet->options.free_data= false;
00244
00245 break;
00246
00247
00248 case GEARMAN_COMMAND_SUBMIT_JOB:
00249 case GEARMAN_COMMAND_SUBMIT_JOB_BG:
00250 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
00251 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
00252 case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
00253 case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
00254
00255 if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
00256 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG)
00257 {
00258 priority= GEARMAN_JOB_PRIORITY_NORMAL;
00259 }
00260 else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
00261 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
00262 {
00263 priority= GEARMAN_JOB_PRIORITY_HIGH;
00264 }
00265 else
00266 priority= GEARMAN_JOB_PRIORITY_LOW;
00267
00268 if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
00269 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
00270 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
00271 {
00272 server_client= NULL;
00273 }
00274 else
00275 {
00276 server_client= gearman_server_client_add(server_con);
00277 if (server_client == NULL)
00278 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00279 }
00280
00281
00282 server_job= gearman_server_job_add(server_con->thread->server,
00283 (char *)(packet->arg[0]),
00284 packet->arg_size[0] - 1,
00285 (char *)(packet->arg[1]),
00286 packet->arg_size[1] - 1, packet->data,
00287 packet->data_size, priority,
00288 server_client, &ret);
00289 if (ret == GEARMAN_SUCCESS)
00290 {
00291 packet->options.free_data= false;
00292 }
00293 else if (ret == GEARMAN_JOB_QUEUE_FULL)
00294 {
00295 return _server_error_packet(server_con, "queue_full",
00296 "Job queue is full");
00297 }
00298 else if (ret != GEARMAN_JOB_EXISTS)
00299 return ret;
00300
00301
00302 ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00303 GEARMAN_COMMAND_JOB_CREATED,
00304 server_job->job_handle,
00305 (size_t)strlen(server_job->job_handle),
00306 NULL);
00307 if (ret != GEARMAN_SUCCESS)
00308 return ret;
00309
00310 break;
00311
00312 case GEARMAN_COMMAND_GET_STATUS:
00313
00314 snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00315 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00316
00317 server_job= gearman_server_job_get(server_con->thread->server, job_handle, NULL);
00318
00319
00320 if (server_job == NULL)
00321 {
00322 ret= gearman_server_io_packet_add(server_con, false,
00323 GEARMAN_MAGIC_RESPONSE,
00324 GEARMAN_COMMAND_STATUS_RES, job_handle,
00325 (size_t)(strlen(job_handle) + 1),
00326 "0", (size_t)2, "0", (size_t)2, "0",
00327 (size_t)2, "0", (size_t)1, NULL);
00328 }
00329 else
00330 {
00331 snprintf(numerator_buffer, 11, "%u", server_job->numerator);
00332 snprintf(denominator_buffer, 11, "%u", server_job->denominator);
00333
00334 ret= gearman_server_io_packet_add(server_con, false,
00335 GEARMAN_MAGIC_RESPONSE,
00336 GEARMAN_COMMAND_STATUS_RES, job_handle,
00337 (size_t)(strlen(job_handle) + 1),
00338 "1", (size_t)2,
00339 server_job->worker == NULL ? "0" : "1",
00340 (size_t)2, numerator_buffer,
00341 (size_t)(strlen(numerator_buffer) + 1),
00342 denominator_buffer,
00343 (size_t)strlen(denominator_buffer),
00344 NULL);
00345 }
00346
00347 if (ret != GEARMAN_SUCCESS)
00348 return ret;
00349
00350 break;
00351
00352 case GEARMAN_COMMAND_OPTION_REQ:
00353
00354 snprintf(option, GEARMAN_OPTION_SIZE, "%.*s",
00355 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00356
00357 if (!strcasecmp(option, "exceptions"))
00358 {
00359 server_con->is_exceptions= true;
00360 }
00361 else
00362 {
00363 return _server_error_packet(server_con, "unknown_option",
00364 "Server does not recognize given option");
00365 }
00366
00367 ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00368 GEARMAN_COMMAND_OPTION_RES,
00369 packet->arg[0], packet->arg_size[0],
00370 NULL);
00371 if (ret != GEARMAN_SUCCESS)
00372 return ret;
00373
00374 break;
00375
00376
00377 case GEARMAN_COMMAND_CAN_DO:
00378 if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00379 packet->arg_size[0], 0) == NULL)
00380 {
00381 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00382 }
00383
00384 break;
00385
00386 case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
00387 if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00388 packet->arg_size[0] - 1,
00389 (in_port_t)atoi((char *)(packet->arg[1])))
00390 == NULL)
00391 {
00392 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00393 }
00394
00395 break;
00396
00397 case GEARMAN_COMMAND_CANT_DO:
00398 gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
00399 packet->arg_size[0]);
00400 break;
00401
00402 case GEARMAN_COMMAND_RESET_ABILITIES:
00403 gearman_server_con_free_workers(server_con);
00404 break;
00405
00406 case GEARMAN_COMMAND_PRE_SLEEP:
00407 server_job= gearman_server_job_peek(server_con);
00408 if (server_job == NULL)
00409 {
00410 server_con->is_sleeping= true;
00411 }
00412 else
00413 {
00414
00415
00416 ret= gearman_server_io_packet_add(server_con, false,
00417 GEARMAN_MAGIC_RESPONSE,
00418 GEARMAN_COMMAND_NOOP, NULL);
00419 if (ret != GEARMAN_SUCCESS)
00420 return ret;
00421 }
00422
00423 break;
00424
00425 case GEARMAN_COMMAND_GRAB_JOB:
00426 case GEARMAN_COMMAND_GRAB_JOB_UNIQ:
00427 server_con->is_sleeping= false;
00428 server_con->is_noop_sent= false;
00429
00430 server_job= gearman_server_job_take(server_con);
00431 if (server_job == NULL)
00432 {
00433
00434 ret= gearman_server_io_packet_add(server_con, false,
00435 GEARMAN_MAGIC_RESPONSE,
00436 GEARMAN_COMMAND_NO_JOB, NULL);
00437 }
00438 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
00439 {
00440
00441
00442 ret= gearman_server_io_packet_add(server_con, false,
00443 GEARMAN_MAGIC_RESPONSE,
00444 GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
00445 server_job->job_handle,
00446 (size_t)(strlen(server_job->job_handle) + 1),
00447 server_job->function->function_name,
00448 server_job->function->function_name_size + 1,
00449 server_job->unique,
00450 (size_t)(strlen(server_job->unique) + 1),
00451 server_job->data, server_job->data_size,
00452 NULL);
00453 }
00454 else
00455 {
00456
00457 ret= gearman_server_io_packet_add(server_con, false,
00458 GEARMAN_MAGIC_RESPONSE,
00459 GEARMAN_COMMAND_JOB_ASSIGN,
00460 server_job->job_handle,
00461 (size_t)(strlen(server_job->job_handle) + 1),
00462 server_job->function->function_name,
00463 server_job->function->function_name_size + 1,
00464 server_job->data, server_job->data_size,
00465 NULL);
00466 }
00467
00468 if (ret != GEARMAN_SUCCESS)
00469 {
00470 if (server_job != NULL)
00471 return gearman_server_job_queue(server_job);
00472 return ret;
00473 }
00474
00475 break;
00476
00477 case GEARMAN_COMMAND_WORK_DATA:
00478 case GEARMAN_COMMAND_WORK_WARNING:
00479 server_job= gearman_server_job_get(server_con->thread->server,
00480 (char *)(packet->arg[0]),
00481 server_con);
00482 if (server_job == NULL)
00483 {
00484 return _server_error_packet(server_con, "job_not_found",
00485 "Job given in work result not found");
00486 }
00487
00488
00489 ret= _server_queue_work_data(server_job, packet, packet->command);
00490 if (ret != GEARMAN_SUCCESS)
00491 return ret;
00492
00493 break;
00494
00495 case GEARMAN_COMMAND_WORK_STATUS:
00496 server_job= gearman_server_job_get(server_con->thread->server,
00497 (char *)(packet->arg[0]),
00498 server_con);
00499 if (server_job == NULL)
00500 {
00501 return _server_error_packet(server_con, "job_not_found",
00502 "Job given in work result not found");
00503 }
00504
00505
00506 server_job->numerator= (uint32_t)atoi((char *)(packet->arg[1]));
00507
00508
00509 snprintf(denominator_buffer, 11, "%.*s", (uint32_t)(packet->arg_size[2]),
00510 (char *)(packet->arg[2]));
00511 server_job->denominator= (uint32_t)atoi(denominator_buffer);
00512
00513
00514 for (server_client= server_job->client_list; server_client;
00515 server_client= server_client->job_next)
00516 {
00517 ret= gearman_server_io_packet_add(server_client->con, false,
00518 GEARMAN_MAGIC_RESPONSE,
00519 GEARMAN_COMMAND_WORK_STATUS,
00520 packet->arg[0], packet->arg_size[0],
00521 packet->arg[1], packet->arg_size[1],
00522 packet->arg[2], packet->arg_size[2],
00523 NULL);
00524 if (ret != GEARMAN_SUCCESS)
00525 return ret;
00526 }
00527
00528 break;
00529
00530 case GEARMAN_COMMAND_WORK_COMPLETE:
00531 server_job= gearman_server_job_get(server_con->thread->server,
00532 (char *)(packet->arg[0]),
00533 server_con);
00534 if (server_job == NULL)
00535 {
00536 return _server_error_packet(server_con, "job_not_found",
00537 "Job given in work result not found");
00538 }
00539
00540
00541 ret= _server_queue_work_data(server_job, packet,
00542 GEARMAN_COMMAND_WORK_COMPLETE);
00543 if (ret != GEARMAN_SUCCESS)
00544 return ret;
00545
00546
00547 if (server_job->job_queued && server->queue_done_fn != NULL)
00548 {
00549 ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00550 server_job->unique,
00551 (size_t)strlen(server_job->unique),
00552 server_job->function->function_name,
00553 server_job->function->function_name_size);
00554 if (ret != GEARMAN_SUCCESS)
00555 return ret;
00556 }
00557
00558
00559 gearman_server_job_free(server_job);
00560 break;
00561
00562 case GEARMAN_COMMAND_WORK_EXCEPTION:
00563 server_job= gearman_server_job_get(server_con->thread->server,
00564 (char *)(packet->arg[0]),
00565 server_con);
00566 if (server_job == NULL)
00567 {
00568 return _server_error_packet(server_con, "job_not_found",
00569 "Job given in work result not found");
00570 }
00571
00572
00573 ret= _server_queue_work_data(server_job, packet,
00574 GEARMAN_COMMAND_WORK_EXCEPTION);
00575 if (ret != GEARMAN_SUCCESS)
00576 return ret;
00577 break;
00578
00579 case GEARMAN_COMMAND_WORK_FAIL:
00580
00581 snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00582 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00583
00584 server_job= gearman_server_job_get(server_con->thread->server, job_handle,
00585 server_con);
00586 if (server_job == NULL)
00587 {
00588 return _server_error_packet(server_con, "job_not_found",
00589 "Job given in work result not found");
00590 }
00591
00592
00593 for (server_client= server_job->client_list; server_client;
00594 server_client= server_client->job_next)
00595 {
00596 ret= gearman_server_io_packet_add(server_client->con, false,
00597 GEARMAN_MAGIC_RESPONSE,
00598 GEARMAN_COMMAND_WORK_FAIL,
00599 packet->arg[0], packet->arg_size[0],
00600 NULL);
00601 if (ret != GEARMAN_SUCCESS)
00602 return ret;
00603 }
00604
00605
00606 if (server_job->job_queued && server->queue_done_fn != NULL)
00607 {
00608 ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00609 server_job->unique,
00610 (size_t)strlen(server_job->unique),
00611 server_job->function->function_name,
00612 server_job->function->function_name_size);
00613 if (ret != GEARMAN_SUCCESS)
00614 return ret;
00615 }
00616
00617
00618 gearman_server_job_free(server_job);
00619 break;
00620
00621 case GEARMAN_COMMAND_SET_CLIENT_ID:
00622 gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
00623 packet->arg_size[0]);
00624 break;
00625
00626 case GEARMAN_COMMAND_TEXT:
00627 return _server_run_text(server_con, packet);
00628
00629 case GEARMAN_COMMAND_UNUSED:
00630 case GEARMAN_COMMAND_NOOP:
00631 case GEARMAN_COMMAND_JOB_CREATED:
00632 case GEARMAN_COMMAND_NO_JOB:
00633 case GEARMAN_COMMAND_JOB_ASSIGN:
00634 case GEARMAN_COMMAND_ECHO_RES:
00635 case GEARMAN_COMMAND_ERROR:
00636 case GEARMAN_COMMAND_STATUS_RES:
00637 case GEARMAN_COMMAND_ALL_YOURS:
00638 case GEARMAN_COMMAND_OPTION_RES:
00639 case GEARMAN_COMMAND_SUBMIT_JOB_SCHED:
00640 case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
00641 case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
00642 case GEARMAN_COMMAND_MAX:
00643 default:
00644 return _server_error_packet(server_con, "bad_command",
00645 "Command not expected");
00646 }
00647
00648 return GEARMAN_SUCCESS;
00649 }
00650
00651 gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
00652 {
00653 server->shutdown_graceful= true;
00654
00655 if (server->job_count == 0)
00656 return GEARMAN_SHUTDOWN;
00657
00658 return GEARMAN_SHUTDOWN_GRACEFUL;
00659 }
00660
00661 gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
00662 {
00663 gearman_return_t ret;
00664
00665 if (server->queue_replay_fn == NULL)
00666 return GEARMAN_SUCCESS;
00667
00668 server->state.queue_startup= true;
00669
00670 ret= (*(server->queue_replay_fn))(server, (void *)server->queue_context,
00671 _queue_replay_add, server);
00672
00673 server->state.queue_startup= false;
00674
00675 return ret;
00676 }
00677
00678 void *gearman_server_queue_context(const gearman_server_st *server)
00679 {
00680 return (void *)server->queue_context;
00681 }
00682
00683 void gearman_server_set_queue_context(gearman_server_st *server,
00684 void *context)
00685 {
00686 server->queue_context= context;
00687 }
00688
00689 void gearman_server_set_queue_add_fn(gearman_server_st *server,
00690 gearman_queue_add_fn *function)
00691 {
00692 server->queue_add_fn= function;
00693 }
00694
00695 void gearman_server_set_queue_flush_fn(gearman_server_st *server,
00696 gearman_queue_flush_fn *function)
00697 {
00698 server->queue_flush_fn= function;
00699 }
00700
00701 void gearman_server_set_queue_done_fn(gearman_server_st *server,
00702 gearman_queue_done_fn *function)
00703 {
00704 server->queue_done_fn= function;
00705 }
00706
00707 void gearman_server_set_queue_replay_fn(gearman_server_st *server,
00708 gearman_queue_replay_fn *function)
00709 {
00710 server->queue_replay_fn= function;
00711 }
00712
00713
00714
00715
00716
00717 gearman_return_t _queue_replay_add(gearman_server_st *server,
00718 void *context __attribute__ ((unused)),
00719 const void *unique, size_t unique_size,
00720 const void *function_name,
00721 size_t function_name_size, const void *data,
00722 size_t data_size,
00723 gearman_job_priority_t priority)
00724 {
00725 gearman_return_t ret;
00726
00727 (void)gearman_server_job_add(server, (char *)function_name,
00728 function_name_size, (char *)unique, unique_size,
00729 data, data_size, priority, NULL, &ret);
00730 return ret;
00731 }
00732
00733 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00734 const char *error_code,
00735 const char *error_string)
00736 {
00737 return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00738 GEARMAN_COMMAND_ERROR, error_code,
00739 (size_t)(strlen(error_code) + 1),
00740 error_string,
00741 (size_t)strlen(error_string), NULL);
00742 }
00743
00744 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00745 gearman_packet_st *packet)
00746 {
00747 char *data;
00748 char *new_data;
00749 size_t size;
00750 size_t total;
00751 int max_queue_size;
00752 gearman_server_thread_st *thread;
00753 gearman_server_con_st *con;
00754 gearman_server_worker_st *worker;
00755 gearman_server_function_st *function;
00756 gearman_server_packet_st *server_packet;
00757
00758 data= (char *)malloc(GEARMAN_TEXT_RESPONSE_SIZE);
00759 if (data == NULL)
00760 {
00761 gearman_log_error(packet->universal, "_server_run_text", "malloc");
00762 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00763 }
00764 total= GEARMAN_TEXT_RESPONSE_SIZE;
00765
00766 if (packet->argc == 0)
00767 {
00768 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00769 "ERR unknown_command Unknown+server+command\n");
00770 }
00771 else if (!strcasecmp("workers", (char *)(packet->arg[0])))
00772 {
00773 size= 0;
00774
00775 for (thread= server_con->thread->server->thread_list; thread != NULL;
00776 thread= thread->next)
00777 {
00778 (void) pthread_mutex_lock(&thread->lock);
00779
00780 for (con= thread->con_list; con != NULL; con= con->next)
00781 {
00782 if (con->host == NULL)
00783 continue;
00784
00785 if (size > total)
00786 size= total;
00787
00788
00789 if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00790 {
00791 new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00792 if (new_data == NULL)
00793 {
00794 (void) pthread_mutex_unlock(&thread->lock);
00795 free(data);
00796 gearman_log_error(packet->universal, "_server_run_text", "malloc");
00797 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00798 }
00799
00800 data= new_data;
00801 total+= GEARMAN_TEXT_RESPONSE_SIZE;
00802 }
00803
00804 size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
00805 con->con.fd, con->host, con->id);
00806 if (size > total)
00807 continue;
00808
00809 for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
00810 {
00811 size+= (size_t)snprintf(data + size, total - size, " %.*s",
00812 (int)(worker->function->function_name_size),
00813 worker->function->function_name);
00814 if (size > total)
00815 break;
00816 }
00817
00818 if (size > total)
00819 continue;
00820
00821 size+= (size_t)snprintf(data + size, total - size, "\n");
00822 }
00823
00824 (void) pthread_mutex_unlock(&thread->lock);
00825 }
00826
00827 if (size < total)
00828 snprintf(data + size, total - size, ".\n");
00829 }
00830 else if (!strcasecmp("status", (char *)(packet->arg[0])))
00831 {
00832 size= 0;
00833
00834 for (function= server_con->thread->server->function_list; function != NULL;
00835 function= function->next)
00836 {
00837 if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00838 {
00839 new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00840 if (new_data == NULL)
00841 {
00842 free(data);
00843 gearman_log_error(packet->universal, "_server_run_text", "malloc");
00844 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00845 }
00846
00847 data= new_data;
00848 total+= GEARMAN_TEXT_RESPONSE_SIZE;
00849 }
00850
00851 size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
00852 (int)(function->function_name_size),
00853 function->function_name, function->job_total,
00854 function->job_running, function->worker_count);
00855 if (size > total)
00856 size= total;
00857 }
00858
00859 if (size < total)
00860 snprintf(data + size, total - size, ".\n");
00861 }
00862 else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
00863 {
00864 if (packet->argc == 1)
00865 {
00866 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
00867 "An+incomplete+set+of+arguments+was+sent+to+this+command\n");
00868 }
00869 else
00870 {
00871 if (packet->argc == 2)
00872 max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
00873 else
00874 {
00875 max_queue_size= atoi((char *)(packet->arg[2]));
00876 if (max_queue_size < 0)
00877 max_queue_size= 0;
00878 }
00879
00880 for (function= server_con->thread->server->function_list;
00881 function != NULL; function= function->next)
00882 {
00883 if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
00884 !memcmp(packet->arg[1], function->function_name,
00885 function->function_name_size))
00886 {
00887 function->max_queue_size= (uint32_t)max_queue_size;
00888 }
00889 }
00890
00891 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00892 }
00893 }
00894 else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
00895 {
00896 if (packet->argc == 1)
00897 {
00898 server_con->thread->server->shutdown= true;
00899 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00900 }
00901 else if (packet->argc == 2 &&
00902 !strcasecmp("graceful", (char *)(packet->arg[1])))
00903 {
00904 server_con->thread->server->shutdown_graceful= true;
00905 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00906 }
00907 else
00908 {
00909 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00910 "ERR unknown_args Unknown+arguments+to+server+command\n");
00911 }
00912 }
00913 else if (!strcasecmp("version", (char *)(packet->arg[0])))
00914 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
00915 else
00916 {
00917 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00918 "ERR unknown_command Unknown+server+command\n");
00919 }
00920
00921 server_packet= gearman_server_packet_create(server_con->thread, false);
00922 if (server_packet == NULL)
00923 {
00924 free(data);
00925 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00926 }
00927
00928 if (gearman_packet_create(server_con->thread->gearman,
00929 &(server_packet->packet)) == NULL)
00930 {
00931 free(data);
00932 gearman_server_packet_free(server_packet, server_con->thread, false);
00933 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00934 }
00935
00936 server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
00937 server_packet->packet.command= GEARMAN_COMMAND_TEXT;
00938 server_packet->packet.options.complete= true;
00939 server_packet->packet.options.free_data= true;
00940
00941 server_packet->packet.data= data;
00942 server_packet->packet.data_size= strlen(data);
00943
00944 (void) pthread_mutex_lock(&server_con->thread->lock);
00945 GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
00946 (void) pthread_mutex_unlock(&server_con->thread->lock);
00947
00948 gearman_server_con_io_add(server_con);
00949
00950 return GEARMAN_SUCCESS;
00951 }
00952
00953 static gearman_return_t
00954 _server_queue_work_data(gearman_server_job_st *server_job,
00955 gearman_packet_st *packet, gearman_command_t command)
00956 {
00957 gearman_server_client_st *server_client;
00958 uint8_t *data;
00959 gearman_return_t ret;
00960
00961 for (server_client= server_job->client_list; server_client;
00962 server_client= server_client->job_next)
00963 {
00964 if (command == GEARMAN_COMMAND_WORK_EXCEPTION && !(server_client->con->is_exceptions))
00965 {
00966 continue;
00967 }
00968
00969 if (packet->data_size > 0)
00970 {
00971 if (packet->options.free_data &&
00972 server_client->job_next == NULL)
00973 {
00974 data= (uint8_t *)(packet->data);
00975 packet->options.free_data= false;
00976 }
00977 else
00978 {
00979 data= (uint8_t *)malloc(packet->data_size);
00980 if (data == NULL)
00981 {
00982 gearman_log_error(packet->universal, "_server_run_command", "malloc");
00983 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00984 }
00985
00986 memcpy(data, packet->data, packet->data_size);
00987 }
00988 }
00989 else
00990 data= NULL;
00991
00992 ret= gearman_server_io_packet_add(server_client->con, true,
00993 GEARMAN_MAGIC_RESPONSE, command,
00994 packet->arg[0], packet->arg_size[0],
00995 data, packet->data_size, NULL);
00996 if (ret != GEARMAN_SUCCESS)
00997 return ret;
00998 }
00999
01000 return GEARMAN_SUCCESS;
01001 }
01002
01003 static void _log(const char *line, gearman_verbose_t verbose, void *context)
01004 {
01005 gearman_server_st *server= (gearman_server_st *)context;
01006 (*(server->log_fn))(line, verbose, (void *)server->log_context);
01007 }