00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 gearman_return_t _queue_replay_add(gearman_server_st *server, void *context,
00030 const void *unique, size_t unique_size,
00031 const void *function_name,
00032 size_t function_name_size, const void *data,
00033 size_t data_size,
00034 gearman_job_priority_t priority);
00035
00039 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00040 const char *error_code,
00041 const char *error_string);
00042
00046 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00047 gearman_packet_st *packet);
00048
00052 static gearman_return_t
00053 _server_queue_work_data(gearman_server_job_st *server_job,
00054 gearman_packet_st *packet, gearman_command_t command);
00055
00059 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00060
00063
00064
00065
00066
00067 gearman_server_st *gearman_server_create(gearman_server_st *server)
00068 {
00069 struct utsname un;
00070
00071 if (server == NULL)
00072 {
00073 server= malloc(sizeof(gearman_server_st));
00074 if (server == NULL)
00075 return NULL;
00076
00077 server->options= GEARMAN_SERVER_ALLOCATED;
00078 }
00079 else
00080 server->options= 0;
00081
00082 server->shutdown= false;
00083 server->shutdown_graceful= false;
00084 server->proc_wakeup= false;
00085 server->proc_shutdown= false;
00086 server->job_retries= 0;
00087 server->thread_count= 0;
00088 server->free_packet_count= 0;
00089 server->function_count= 0;
00090 server->job_count= 0;
00091 server->unique_count= 0;
00092 server->free_job_count= 0;
00093 server->free_client_count= 0;
00094 server->free_worker_count= 0;
00095 server->thread_list= NULL;
00096 server->free_packet_list= NULL;
00097 server->function_list= NULL;
00098 server->free_job_list= NULL;
00099 server->free_client_list= NULL;
00100 server->free_worker_list= NULL;
00101 server->log_fn= NULL;
00102 server->log_context= NULL;
00103 server->queue_context= NULL;
00104 server->queue_add_fn= NULL;
00105 server->queue_flush_fn= NULL;
00106 server->queue_done_fn= NULL;
00107 server->queue_replay_fn= NULL;
00108 memset(server->job_hash, 0,
00109 sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00110 memset(server->unique_hash, 0,
00111 sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
00112
00113 server->gearman= gearman_create(&(server->gearman_static));
00114 if (server->gearman == NULL)
00115 {
00116 gearman_server_free(server);
00117 return NULL;
00118 }
00119
00120 if (uname(&un) == -1)
00121 {
00122 gearman_server_free(server);
00123 return NULL;
00124 }
00125
00126 snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
00127 un.nodename);
00128 server->job_handle_count= 1;
00129
00130 return server;
00131 }
00132
00133 void gearman_server_free(gearman_server_st *server)
00134 {
00135 uint32_t key;
00136 gearman_server_packet_st *packet;
00137 gearman_server_job_st *job;
00138 gearman_server_client_st *client;
00139 gearman_server_worker_st *worker;
00140
00141
00142 assert(server->thread_list == NULL);
00143
00144 for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
00145 {
00146 while (server->job_hash[key] != NULL)
00147 gearman_server_job_free(server->job_hash[key]);
00148 }
00149
00150 while (server->function_list != NULL)
00151 gearman_server_function_free(server->function_list);
00152
00153 while (server->free_packet_list != NULL)
00154 {
00155 packet= server->free_packet_list;
00156 server->free_packet_list= packet->next;
00157 free(packet);
00158 }
00159
00160 while (server->free_job_list != NULL)
00161 {
00162 job= server->free_job_list;
00163 server->free_job_list= job->next;
00164 free(job);
00165 }
00166
00167 while (server->free_client_list != NULL)
00168 {
00169 client= server->free_client_list;
00170 server->free_client_list= client->con_next;
00171 free(client);
00172 }
00173
00174 while (server->free_worker_list != NULL)
00175 {
00176 worker= server->free_worker_list;
00177 server->free_worker_list= worker->con_next;
00178 free(worker);
00179 }
00180
00181 if (server->gearman != NULL)
00182 gearman_free(server->gearman);
00183
00184 if (server->options & GEARMAN_SERVER_ALLOCATED)
00185 free(server);
00186 }
00187
00188 void gearman_server_set_job_retries(gearman_server_st *server,
00189 uint8_t job_retries)
00190 {
00191 server->job_retries= job_retries;
00192 }
00193
00194 void gearman_server_set_log_fn(gearman_server_st *server,
00195 gearman_log_fn *function,
00196 const void *context, gearman_verbose_t verbose)
00197 {
00198 server->log_fn= function;
00199 server->log_context= context;
00200 gearman_set_log_fn(server->gearman, _log, server, verbose);
00201 }
00202
00203 gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
00204 gearman_packet_st *packet)
00205 {
00206 gearman_return_t ret;
00207 gearman_server_job_st *server_job;
00208 char job_handle[GEARMAN_JOB_HANDLE_SIZE];
00209 char option[GEARMAN_OPTION_SIZE];
00210 gearman_server_client_st *server_client;
00211 char numerator_buffer[11];
00212 char denominator_buffer[11];
00213 gearman_job_priority_t priority;
00214 gearman_server_st *server= server_con->thread->server;
00215
00216 if (packet->magic == GEARMAN_MAGIC_RESPONSE)
00217 {
00218 return _server_error_packet(server_con, "bad_magic",
00219 "Request magic expected");
00220 }
00221
00222 switch (packet->command)
00223 {
00224
00225 case GEARMAN_COMMAND_ECHO_REQ:
00226
00227 ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
00228 GEARMAN_COMMAND_ECHO_RES, packet->data,
00229 packet->data_size, NULL);
00230 if (ret != GEARMAN_SUCCESS)
00231 return ret;
00232
00233 packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
00234
00235 break;
00236
00237
00238 case GEARMAN_COMMAND_SUBMIT_JOB:
00239 case GEARMAN_COMMAND_SUBMIT_JOB_BG:
00240 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
00241 case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
00242 case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
00243 case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
00244
00245 if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
00246 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG)
00247 {
00248 priority= GEARMAN_JOB_PRIORITY_NORMAL;
00249 }
00250 else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
00251 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
00252 {
00253 priority= GEARMAN_JOB_PRIORITY_HIGH;
00254 }
00255 else
00256 priority= GEARMAN_JOB_PRIORITY_LOW;
00257
00258 if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
00259 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
00260 packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
00261 {
00262 server_client= NULL;
00263 }
00264 else
00265 {
00266 server_client= gearman_server_client_add(server_con);
00267 if (server_client == NULL)
00268 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00269 }
00270
00271
00272 server_job= gearman_server_job_add(server_con->thread->server,
00273 (char *)(packet->arg[0]),
00274 packet->arg_size[0] - 1,
00275 (char *)(packet->arg[1]),
00276 packet->arg_size[1] - 1, packet->data,
00277 packet->data_size, priority,
00278 server_client, &ret);
00279 if (ret == GEARMAN_SUCCESS)
00280 packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
00281 else if (ret == GEARMAN_JOB_QUEUE_FULL)
00282 {
00283 return _server_error_packet(server_con, "queue_full",
00284 "Job queue is full");
00285 }
00286 else if (ret != GEARMAN_JOB_EXISTS)
00287 return ret;
00288
00289
00290 ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00291 GEARMAN_COMMAND_JOB_CREATED,
00292 server_job->job_handle,
00293 (size_t)strlen(server_job->job_handle),
00294 NULL);
00295 if (ret != GEARMAN_SUCCESS)
00296 return ret;
00297
00298 break;
00299
00300 case GEARMAN_COMMAND_GET_STATUS:
00301
00302 snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00303 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00304
00305 server_job= gearman_server_job_get(server_con->thread->server, job_handle);
00306
00307
00308 if (server_job == NULL)
00309 {
00310 ret= gearman_server_io_packet_add(server_con, false,
00311 GEARMAN_MAGIC_RESPONSE,
00312 GEARMAN_COMMAND_STATUS_RES, job_handle,
00313 (size_t)(strlen(job_handle) + 1),
00314 "0", (size_t)2, "0", (size_t)2, "0",
00315 (size_t)2, "0", (size_t)1, NULL);
00316 }
00317 else
00318 {
00319 snprintf(numerator_buffer, 11, "%u", server_job->numerator);
00320 snprintf(denominator_buffer, 11, "%u", server_job->denominator);
00321
00322 ret= gearman_server_io_packet_add(server_con, false,
00323 GEARMAN_MAGIC_RESPONSE,
00324 GEARMAN_COMMAND_STATUS_RES, job_handle,
00325 (size_t)(strlen(job_handle) + 1),
00326 "1", (size_t)2,
00327 server_job->worker == NULL ? "0" : "1",
00328 (size_t)2, numerator_buffer,
00329 (size_t)(strlen(numerator_buffer) + 1),
00330 denominator_buffer,
00331 (size_t)strlen(denominator_buffer),
00332 NULL);
00333 }
00334
00335 if (ret != GEARMAN_SUCCESS)
00336 return ret;
00337
00338 break;
00339
00340 case GEARMAN_COMMAND_OPTION_REQ:
00341
00342 snprintf(option, GEARMAN_OPTION_SIZE, "%.*s",
00343 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00344 if (!strcasecmp(option, "exceptions"))
00345 server_con->options|= GEARMAN_SERVER_CON_EXCEPTIONS;
00346 else
00347 {
00348 return _server_error_packet(server_con, "unknown_option",
00349 "Server does not recognize given option");
00350 }
00351
00352 ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00353 GEARMAN_COMMAND_OPTION_RES,
00354 packet->arg[0], packet->arg_size[0],
00355 NULL);
00356 if (ret != GEARMAN_SUCCESS)
00357 return ret;
00358
00359 break;
00360
00361
00362 case GEARMAN_COMMAND_CAN_DO:
00363 if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00364 packet->arg_size[0], 0) == NULL)
00365 {
00366 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00367 }
00368
00369 break;
00370
00371 case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
00372 if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
00373 packet->arg_size[0] - 1,
00374 (in_port_t)atoi((char *)(packet->arg[1])))
00375 == NULL)
00376 {
00377 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00378 }
00379
00380 break;
00381
00382 case GEARMAN_COMMAND_CANT_DO:
00383 gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
00384 packet->arg_size[0]);
00385 break;
00386
00387 case GEARMAN_COMMAND_RESET_ABILITIES:
00388 gearman_server_con_free_workers(server_con);
00389 break;
00390
00391 case GEARMAN_COMMAND_PRE_SLEEP:
00392 server_job= gearman_server_job_peek(server_con);
00393 if (server_job == NULL)
00394 server_con->options|= GEARMAN_SERVER_CON_SLEEPING;
00395 else
00396 {
00397
00398
00399 ret= gearman_server_io_packet_add(server_con, false,
00400 GEARMAN_MAGIC_RESPONSE,
00401 GEARMAN_COMMAND_NOOP, NULL);
00402 if (ret != GEARMAN_SUCCESS)
00403 return ret;
00404 }
00405
00406 break;
00407
00408 case GEARMAN_COMMAND_GRAB_JOB:
00409 case GEARMAN_COMMAND_GRAB_JOB_UNIQ:
00410 server_con->options&=
00411 (gearman_server_con_options_t)~(GEARMAN_SERVER_CON_SLEEPING |
00412 GEARMAN_SERVER_CON_NOOP_SENT);
00413
00414 server_job= gearman_server_job_take(server_con);
00415 if (server_job == NULL)
00416 {
00417
00418 ret= gearman_server_io_packet_add(server_con, false,
00419 GEARMAN_MAGIC_RESPONSE,
00420 GEARMAN_COMMAND_NO_JOB, NULL);
00421 }
00422 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
00423 {
00424
00425
00426 ret= gearman_server_io_packet_add(server_con, false,
00427 GEARMAN_MAGIC_RESPONSE,
00428 GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
00429 server_job->job_handle,
00430 (size_t)(strlen(server_job->job_handle) + 1),
00431 server_job->function->function_name,
00432 server_job->function->function_name_size + 1,
00433 server_job->unique,
00434 (size_t)(strlen(server_job->unique) + 1),
00435 server_job->data, server_job->data_size,
00436 NULL);
00437 }
00438 else
00439 {
00440
00441 ret= gearman_server_io_packet_add(server_con, false,
00442 GEARMAN_MAGIC_RESPONSE,
00443 GEARMAN_COMMAND_JOB_ASSIGN,
00444 server_job->job_handle,
00445 (size_t)(strlen(server_job->job_handle) + 1),
00446 server_job->function->function_name,
00447 server_job->function->function_name_size + 1,
00448 server_job->data, server_job->data_size,
00449 NULL);
00450 }
00451
00452 if (ret != GEARMAN_SUCCESS)
00453 {
00454 if (server_job != NULL)
00455 return gearman_server_job_queue(server_job);
00456 return ret;
00457 }
00458
00459 break;
00460
00461 case GEARMAN_COMMAND_WORK_DATA:
00462 case GEARMAN_COMMAND_WORK_WARNING:
00463 server_job= gearman_server_job_get(server_con->thread->server,
00464 (char *)(packet->arg[0]));
00465 if (server_job == NULL)
00466 {
00467 return _server_error_packet(server_con, "job_not_found",
00468 "Job given in work result not found");
00469 }
00470
00471
00472 ret= _server_queue_work_data(server_job, packet, packet->command);
00473 if (ret != GEARMAN_SUCCESS)
00474 return ret;
00475
00476 break;
00477
00478 case GEARMAN_COMMAND_WORK_STATUS:
00479 server_job= gearman_server_job_get(server_con->thread->server,
00480 (char *)(packet->arg[0]));
00481 if (server_job == NULL)
00482 {
00483 return _server_error_packet(server_con, "job_not_found",
00484 "Job given in work result not found");
00485 }
00486
00487
00488 server_job->numerator= (uint32_t)atoi((char *)(packet->arg[1]));
00489
00490
00491 snprintf(denominator_buffer, 11, "%.*s", (uint32_t)(packet->arg_size[2]),
00492 (char *)(packet->arg[2]));
00493 server_job->denominator= (uint32_t)atoi(denominator_buffer);
00494
00495
00496 for (server_client= server_job->client_list; server_client;
00497 server_client= server_client->job_next)
00498 {
00499 ret= gearman_server_io_packet_add(server_client->con, false,
00500 GEARMAN_MAGIC_RESPONSE,
00501 GEARMAN_COMMAND_WORK_STATUS,
00502 packet->arg[0], packet->arg_size[0],
00503 packet->arg[1], packet->arg_size[1],
00504 packet->arg[2], packet->arg_size[2],
00505 NULL);
00506 if (ret != GEARMAN_SUCCESS)
00507 return ret;
00508 }
00509
00510 break;
00511
00512 case GEARMAN_COMMAND_WORK_COMPLETE:
00513 server_job= gearman_server_job_get(server_con->thread->server,
00514 (char *)(packet->arg[0]));
00515 if (server_job == NULL)
00516 {
00517 return _server_error_packet(server_con, "job_not_found",
00518 "Job given in work result not found");
00519 }
00520
00521
00522 ret= _server_queue_work_data(server_job, packet,
00523 GEARMAN_COMMAND_WORK_COMPLETE);
00524 if (ret != GEARMAN_SUCCESS)
00525 return ret;
00526
00527
00528 if (server_job->options & GEARMAN_SERVER_JOB_QUEUED &&
00529 server->queue_done_fn != NULL)
00530 {
00531 ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00532 server_job->unique,
00533 (size_t)strlen(server_job->unique),
00534 server_job->function->function_name,
00535 server_job->function->function_name_size);
00536 if (ret != GEARMAN_SUCCESS)
00537 return ret;
00538 }
00539
00540
00541 gearman_server_job_free(server_job);
00542 break;
00543
00544 case GEARMAN_COMMAND_WORK_EXCEPTION:
00545 server_job= gearman_server_job_get(server_con->thread->server,
00546 (char *)(packet->arg[0]));
00547 if (server_job == NULL)
00548 {
00549 return _server_error_packet(server_con, "job_not_found",
00550 "Job given in work result not found");
00551 }
00552
00553
00554 ret= _server_queue_work_data(server_job, packet,
00555 GEARMAN_COMMAND_WORK_EXCEPTION);
00556 if (ret != GEARMAN_SUCCESS)
00557 return ret;
00558 break;
00559
00560 case GEARMAN_COMMAND_WORK_FAIL:
00561
00562 snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
00563 (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
00564
00565 server_job= gearman_server_job_get(server_con->thread->server, job_handle);
00566 if (server_job == NULL)
00567 {
00568 return _server_error_packet(server_con, "job_not_found",
00569 "Job given in work result not found");
00570 }
00571
00572
00573 for (server_client= server_job->client_list; server_client;
00574 server_client= server_client->job_next)
00575 {
00576 ret= gearman_server_io_packet_add(server_client->con, false,
00577 GEARMAN_MAGIC_RESPONSE,
00578 GEARMAN_COMMAND_WORK_FAIL,
00579 packet->arg[0], packet->arg_size[0],
00580 NULL);
00581 if (ret != GEARMAN_SUCCESS)
00582 return ret;
00583 }
00584
00585
00586 if (server_job->options & GEARMAN_SERVER_JOB_QUEUED &&
00587 server->queue_done_fn != NULL)
00588 {
00589 ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
00590 server_job->unique,
00591 (size_t)strlen(server_job->unique),
00592 server_job->function->function_name,
00593 server_job->function->function_name_size);
00594 if (ret != GEARMAN_SUCCESS)
00595 return ret;
00596 }
00597
00598
00599 gearman_server_job_free(server_job);
00600 break;
00601
00602 case GEARMAN_COMMAND_SET_CLIENT_ID:
00603 gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
00604 packet->arg_size[0]);
00605 break;
00606
00607 case GEARMAN_COMMAND_TEXT:
00608 return _server_run_text(server_con, packet);
00609
00610 case GEARMAN_COMMAND_UNUSED:
00611 case GEARMAN_COMMAND_NOOP:
00612 case GEARMAN_COMMAND_JOB_CREATED:
00613 case GEARMAN_COMMAND_NO_JOB:
00614 case GEARMAN_COMMAND_JOB_ASSIGN:
00615 case GEARMAN_COMMAND_ECHO_RES:
00616 case GEARMAN_COMMAND_ERROR:
00617 case GEARMAN_COMMAND_STATUS_RES:
00618 case GEARMAN_COMMAND_ALL_YOURS:
00619 case GEARMAN_COMMAND_OPTION_RES:
00620 case GEARMAN_COMMAND_SUBMIT_JOB_SCHED:
00621 case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
00622 case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
00623 case GEARMAN_COMMAND_MAX:
00624 default:
00625 return _server_error_packet(server_con, "bad_command",
00626 "Command not expected");
00627 }
00628
00629 return GEARMAN_SUCCESS;
00630 }
00631
00632 gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
00633 {
00634 server->shutdown_graceful= true;
00635
00636 if (server->job_count == 0)
00637 return GEARMAN_SHUTDOWN;
00638
00639 return GEARMAN_SHUTDOWN_GRACEFUL;
00640 }
00641
00642 gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
00643 {
00644 gearman_return_t ret;
00645
00646 if (server->queue_replay_fn == NULL)
00647 return GEARMAN_SUCCESS;
00648
00649 server->options|= GEARMAN_SERVER_QUEUE_REPLAY;
00650
00651 ret= (*(server->queue_replay_fn))(server, (void *)server->queue_context,
00652 _queue_replay_add, server);
00653
00654 server->options&= (gearman_server_options_t)~GEARMAN_SERVER_QUEUE_REPLAY;
00655
00656 return ret;
00657 }
00658
00659 void *gearman_server_queue_context(const gearman_server_st *server)
00660 {
00661 return (void *)server->queue_context;
00662 }
00663
00664 void gearman_server_set_queue_context(gearman_server_st *server,
00665 const void *context)
00666 {
00667 server->queue_context= context;
00668 }
00669
00670 void gearman_server_set_queue_add_fn(gearman_server_st *server,
00671 gearman_queue_add_fn *function)
00672 {
00673 server->queue_add_fn= function;
00674 }
00675
00676 void gearman_server_set_queue_flush_fn(gearman_server_st *server,
00677 gearman_queue_flush_fn *function)
00678 {
00679 server->queue_flush_fn= function;
00680 }
00681
00682 void gearman_server_set_queue_done_fn(gearman_server_st *server,
00683 gearman_queue_done_fn *function)
00684 {
00685 server->queue_done_fn= function;
00686 }
00687
00688 void gearman_server_set_queue_replay_fn(gearman_server_st *server,
00689 gearman_queue_replay_fn *function)
00690 {
00691 server->queue_replay_fn= function;
00692 }
00693
00694
00695
00696
00697
00698 gearman_return_t _queue_replay_add(gearman_server_st *server,
00699 void *context __attribute__ ((unused)),
00700 const void *unique, size_t unique_size,
00701 const void *function_name,
00702 size_t function_name_size, const void *data,
00703 size_t data_size,
00704 gearman_job_priority_t priority)
00705 {
00706 gearman_return_t ret;
00707
00708 (void)gearman_server_job_add(server, (char *)function_name,
00709 function_name_size, (char *)unique, unique_size,
00710 data, data_size, priority, NULL, &ret);
00711 return ret;
00712 }
00713
00714 static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
00715 const char *error_code,
00716 const char *error_string)
00717 {
00718 return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
00719 GEARMAN_COMMAND_ERROR, error_code,
00720 (size_t)(strlen(error_code) + 1),
00721 error_string,
00722 (size_t)strlen(error_string), NULL);
00723 }
00724
00725 static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
00726 gearman_packet_st *packet)
00727 {
00728 char *data;
00729 char *new_data;
00730 size_t size;
00731 size_t total;
00732 int max_queue_size;
00733 gearman_server_thread_st *thread;
00734 gearman_server_con_st *con;
00735 gearman_server_worker_st *worker;
00736 gearman_server_function_st *function;
00737 gearman_server_packet_st *server_packet;
00738
00739 data= malloc(GEARMAN_TEXT_RESPONSE_SIZE);
00740 if (data == NULL)
00741 {
00742 GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
00743 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00744 }
00745 total= GEARMAN_TEXT_RESPONSE_SIZE;
00746
00747 if (packet->argc == 0)
00748 {
00749 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00750 "ERR unknown_command Unknown+server+command\n");
00751 }
00752 else if (!strcasecmp("workers", (char *)(packet->arg[0])))
00753 {
00754 size= 0;
00755
00756 for (thread= server_con->thread->server->thread_list; thread != NULL;
00757 thread= thread->next)
00758 {
00759 GEARMAN_SERVER_THREAD_LOCK(thread)
00760
00761 for (con= thread->con_list; con != NULL; con= con->next)
00762 {
00763 if (con->host == NULL)
00764 continue;
00765
00766 if (size > total)
00767 size= total;
00768
00769
00770 if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00771 {
00772 new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00773 if (new_data == NULL)
00774 {
00775 GEARMAN_SERVER_THREAD_UNLOCK(thread)
00776 free(data);
00777 GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
00778 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00779 }
00780
00781 data= new_data;
00782 total+= GEARMAN_TEXT_RESPONSE_SIZE;
00783 }
00784
00785 size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
00786 con->con.fd, con->host, con->id);
00787 if (size > total)
00788 continue;
00789
00790 for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
00791 {
00792 size+= (size_t)snprintf(data + size, total - size, " %.*s",
00793 (int)(worker->function->function_name_size),
00794 worker->function->function_name);
00795 if (size > total)
00796 break;
00797 }
00798
00799 if (size > total)
00800 continue;
00801
00802 size+= (size_t)snprintf(data + size, total - size, "\n");
00803 }
00804
00805 GEARMAN_SERVER_THREAD_UNLOCK(thread)
00806 }
00807
00808 if (size < total)
00809 snprintf(data + size, total - size, ".\n");
00810 }
00811 else if (!strcasecmp("status", (char *)(packet->arg[0])))
00812 {
00813 size= 0;
00814
00815 for (function= server_con->thread->server->function_list; function != NULL;
00816 function= function->next)
00817 {
00818 if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
00819 {
00820 new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
00821 if (new_data == NULL)
00822 {
00823 free(data);
00824 GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
00825 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00826 }
00827
00828 data= new_data;
00829 total+= GEARMAN_TEXT_RESPONSE_SIZE;
00830 }
00831
00832 size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
00833 (int)(function->function_name_size),
00834 function->function_name, function->job_total,
00835 function->job_running, function->worker_count);
00836 if (size > total)
00837 size= total;
00838 }
00839
00840 if (size < total)
00841 snprintf(data + size, total - size, ".\n");
00842 }
00843 else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
00844 {
00845 if (packet->argc == 1)
00846 {
00847 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
00848 "An+incomplete+set+of+arguments+was+sent+to+this+command\n");
00849 }
00850 else
00851 {
00852 if (packet->argc == 2)
00853 max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
00854 else
00855 {
00856 max_queue_size= atoi((char *)(packet->arg[2]));
00857 if (max_queue_size < 0)
00858 max_queue_size= 0;
00859 }
00860
00861 for (function= server_con->thread->server->function_list;
00862 function != NULL; function= function->next)
00863 {
00864 if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
00865 !memcmp(packet->arg[1], function->function_name,
00866 function->function_name_size))
00867 {
00868 function->max_queue_size= (uint32_t)max_queue_size;
00869 }
00870 }
00871
00872 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00873 }
00874 }
00875 else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
00876 {
00877 if (packet->argc == 1)
00878 {
00879 server_con->thread->server->shutdown= true;
00880 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00881 }
00882 else if (packet->argc == 2 &&
00883 !strcasecmp("graceful", (char *)(packet->arg[1])))
00884 {
00885 server_con->thread->server->shutdown_graceful= true;
00886 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
00887 }
00888 else
00889 {
00890 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00891 "ERR unknown_args Unknown+arguments+to+server+command\n");
00892 }
00893 }
00894 else if (!strcasecmp("version", (char *)(packet->arg[0])))
00895 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
00896 else
00897 {
00898 snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
00899 "ERR unknown_command Unknown+server+command\n");
00900 }
00901
00902 server_packet= gearman_server_packet_create(server_con->thread, false);
00903 if (server_packet == NULL)
00904 {
00905 free(data);
00906 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00907 }
00908
00909 if (gearman_packet_create(server_con->thread->gearman,
00910 &(server_packet->packet)) == NULL)
00911 {
00912 free(data);
00913 gearman_server_packet_free(server_packet, server_con->thread, false);
00914 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00915 }
00916
00917 server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
00918 server_packet->packet.command= GEARMAN_COMMAND_TEXT;
00919 server_packet->packet.options|= (GEARMAN_PACKET_COMPLETE |
00920 GEARMAN_PACKET_FREE_DATA);
00921 server_packet->packet.data= data;
00922 server_packet->packet.data_size= strlen(data);
00923
00924 GEARMAN_SERVER_THREAD_LOCK(server_con->thread)
00925 GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
00926 GEARMAN_SERVER_THREAD_UNLOCK(server_con->thread)
00927
00928 gearman_server_con_io_add(server_con);
00929
00930 return GEARMAN_SUCCESS;
00931 }
00932
00933 static gearman_return_t
00934 _server_queue_work_data(gearman_server_job_st *server_job,
00935 gearman_packet_st *packet, gearman_command_t command)
00936 {
00937 gearman_server_client_st *server_client;
00938 uint8_t *data;
00939 gearman_return_t ret;
00940
00941 for (server_client= server_job->client_list; server_client;
00942 server_client= server_client->job_next)
00943 {
00944 if (command == GEARMAN_COMMAND_WORK_EXCEPTION &&
00945 !(server_client->con->options & GEARMAN_SERVER_CON_EXCEPTIONS))
00946 {
00947 continue;
00948 }
00949
00950 if (packet->data_size > 0)
00951 {
00952 if (packet->options & GEARMAN_PACKET_FREE_DATA &&
00953 server_client->job_next == NULL)
00954 {
00955 data= (uint8_t *)(packet->data);
00956 packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
00957 }
00958 else
00959 {
00960 data= malloc(packet->data_size);
00961 if (data == NULL)
00962 {
00963 GEARMAN_ERROR_SET(packet->gearman, "_server_run_command", "malloc")
00964 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00965 }
00966
00967 memcpy(data, packet->data, packet->data_size);
00968 }
00969 }
00970 else
00971 data= NULL;
00972
00973 ret= gearman_server_io_packet_add(server_client->con, true,
00974 GEARMAN_MAGIC_RESPONSE, command,
00975 packet->arg[0], packet->arg_size[0],
00976 data, packet->data_size, NULL);
00977 if (ret != GEARMAN_SUCCESS)
00978 return ret;
00979 }
00980
00981 return GEARMAN_SUCCESS;
00982 }
00983
00984 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00985 {
00986 gearman_server_st *server= (gearman_server_st *)context;
00987 (*(server->log_fn))(line, verbose, (void *)server->log_context);
00988 }